From 00962e625bd48503ee66453f396a9e8e8e93d7c9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 29 Jun 2026 10:44:51 +0800 Subject: [PATCH 1/3] Pipe: improve incremental memory estimation --- .../agent/task/PipeDataNodeTaskAgent.java | 282 +++++++++++++++--- .../task/builder/PipeDataNodeTaskBuilder.java | 35 ++- .../subtask/sink/PipeSinkSubtaskManager.java | 96 +++--- .../pipe/agent/task/PipeTaskAgent.java | 26 +- 4 files changed, 339 insertions(+), 100 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 3509e6b29cef9..43b76f6917676 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.agent.task; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.concurrent.IoTThreadFactory; @@ -38,6 +39,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType; import org.apache.iotdb.commons.pipe.config.PipeConfig; @@ -46,6 +48,7 @@ import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -54,6 +57,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeBuilder; import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeTaskBuilder; +import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtaskManager; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -192,6 +196,11 @@ protected void createPipeTask( // For internal source || needConstructDataRegionTask || needConstructSchemaRegionTask) { + calculateMemoryUsage( + pipeStaticMeta, + Collections.singletonList(new Pair<>(consensusGroupId, pipeTaskMeta)), + false); + final PipeDataNodeTask pipeTask = new PipeDataNodeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build(); pipeTask.create(); @@ -779,25 +788,45 @@ public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final int c } @Override - protected void calculateMemoryUsage( - final PipeStaticMeta staticMeta, - final PipeParameters sourceParameters, - final PipeParameters processorParameters, - final PipeParameters sinkParameters) { + protected void calculateMemoryUsage(final PipeMeta pipeMetaFromCoordinator) + throws IllegalPathException { + final PipeStaticMeta staticMeta = pipeMetaFromCoordinator.getStaticMeta(); if (!PipeConfig.getInstance().isPipeEnableMemoryCheck() - || !isInnerSource(sourceParameters) + || !isInnerSource(staticMeta.getSourceParameters()) || !PipeType.USER.equals(staticMeta.getPipeType())) { return; } - calculateInsertNodeQueueMemory(sourceParameters); + final PipeMeta pipeMetaInAgent = pipeMetaKeeper.getPipeMeta(staticMeta.getPipeName()); + final boolean ignoreRegisteredSinkSubtasks = + Objects.nonNull(pipeMetaInAgent) + && (!pipeMetaInAgent.getStaticMeta().equals(staticMeta) + || PipeStatus.DROPPED.equals(pipeMetaInAgent.getRuntimeMeta().getStatus().get())); + calculateMemoryUsage( + staticMeta, + collectPipeTasksToBeCreated(pipeMetaFromCoordinator), + ignoreRegisteredSinkSubtasks); + } - long needMemory = 0; + private void calculateMemoryUsage( + final PipeStaticMeta staticMeta, + final List> pipeTasksToBeCreated, + final boolean ignoreRegisteredSinkSubtasks) + throws IllegalPathException { + if (!PipeConfig.getInstance().isPipeEnableMemoryCheck() + || !isInnerSource(staticMeta.getSourceParameters()) + || !PipeType.USER.equals(staticMeta.getPipeType()) + || pipeTasksToBeCreated.isEmpty()) { + return; + } - needMemory += calculateTsFileParserMemory(sourceParameters, sinkParameters); - needMemory += calculateSinkBatchMemory(sinkParameters); - needMemory += calculateSendTsFileReadBufferMemory(sourceParameters, sinkParameters); - needMemory += calculateAssignerMemory(sourceParameters); + final MemoryEstimation memoryEstimation = + calculateIncrementalMemoryUsage( + staticMeta, pipeTasksToBeCreated, ignoreRegisteredSinkSubtasks); + calculateInsertNodeQueueMemory( + staticMeta.getSourceParameters(), memoryEstimation.dataRegionTaskCount); + + final long needMemory = memoryEstimation.nonFloatingMemoryInBytes; PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory(); final long freeMemorySizeInBytes = pipeMemoryManager.getFreeMemorySizeInBytes(); @@ -819,6 +848,109 @@ protected void calculateMemoryUsage( } } + private List> collectPipeTasksToBeCreated( + final PipeMeta pipeMetaFromCoordinator) throws IllegalPathException { + final PipeStaticMeta pipeStaticMeta = pipeMetaFromCoordinator.getStaticMeta(); + final PipeParameters sourceParameters = pipeStaticMeta.getSourceParameters(); + final Set dataRegionIds = + new HashSet<>(StorageEngine.getInstance().getAllDataRegionIds()); + final Set schemaRegionIds = + new HashSet<>(SchemaEngine.getInstance().getAllSchemaRegionIds()); + final List> pipeTasksToBeCreated = new ArrayList<>(); + + for (final Map.Entry consensusGroupIdToPipeTaskMeta : + pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().entrySet()) { + final int consensusGroupId = consensusGroupIdToPipeTaskMeta.getKey(); + final PipeTaskMeta pipeTaskMeta = consensusGroupIdToPipeTaskMeta.getValue(); + if (pipeTaskMeta.getLeaderNodeId() != CONFIG.getDataNodeId()) { + continue; + } + + final boolean needConstructTask; + if (pipeStaticMeta.isSourceExternal()) { + needConstructTask = true; + } else { + final DataRegionId dataRegionId = new DataRegionId(consensusGroupId); + final boolean needConstructDataRegionTask = + dataRegionIds.contains(dataRegionId) + && DataRegionListeningFilter.shouldDataRegionBeListened( + sourceParameters, dataRegionId); + final boolean needConstructSchemaRegionTask = + schemaRegionIds.contains(new SchemaRegionId(consensusGroupId)) + && SchemaRegionListeningFilter.shouldSchemaRegionBeListened( + consensusGroupId, sourceParameters); + needConstructTask = needConstructDataRegionTask || needConstructSchemaRegionTask; + } + + if (needConstructTask) { + pipeTasksToBeCreated.add(new Pair<>(consensusGroupId, pipeTaskMeta)); + } + } + return pipeTasksToBeCreated; + } + + private MemoryEstimation calculateIncrementalMemoryUsage( + final PipeStaticMeta staticMeta, + final List> pipeTasksToBeCreated, + final boolean ignoreRegisteredSinkSubtasks) { + long needMemory = 0; + int dataRegionTaskCount = 0; + final Set sinkSubtasksToBeCreated = new HashSet<>(); + + for (final Pair regionIdAndTaskMeta : pipeTasksToBeCreated) { + final int regionId = regionIdAndTaskMeta.getLeft(); + final PipeTaskMeta pipeTaskMeta = regionIdAndTaskMeta.getRight(); + final PipeParameters sourceParameters = + PipeDataNodeTaskBuilder.blendUserAndSystemParameters( + staticMeta.getSourceParameters(), pipeTaskMeta); + final PipeParameters sinkParameters = + PipeDataNodeTaskBuilder.blendUserAndSystemParameters( + staticMeta.getSinkParameters(), pipeTaskMeta); + PipeDataNodeTaskBuilder.preprocessParameters(sourceParameters, sinkParameters); + + final boolean isDataRegionTask = isDataRegionTask(regionId); + if (isDataRegionTask) { + dataRegionTaskCount++; + needMemory += calculateTsFileParserMemory(sourceParameters, sinkParameters); + } + + final String sinkSubtaskId = + PipeSinkSubtaskManager.generateAttributeSortedString(sinkParameters, regionId); + if (isDataRegionTask + && !sinkSubtasksToBeCreated.contains(sinkSubtaskId) + && (ignoreRegisteredSinkSubtasks + || !PipeSinkSubtaskManager.instance() + .hasRegisteredSubtasks(sinkParameters, regionId))) { + sinkSubtasksToBeCreated.add(sinkSubtaskId); + final int sinkSubtaskNum = + PipeSinkSubtaskManager.calculateSinkSubtaskNum(sinkParameters, regionId); + needMemory += calculateSinkBatchMemory(sinkParameters) * sinkSubtaskNum; + needMemory += + calculateSendTsFileReadBufferMemory(sourceParameters, sinkParameters) * sinkSubtaskNum; + } + } + + if (dataRegionTaskCount > 0) { + needMemory += calculateAssignerMemory(staticMeta.getSourceParameters()); + } + return new MemoryEstimation(needMemory, dataRegionTaskCount); + } + + private boolean isDataRegionTask(final int regionId) { + return StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) + || PipeRuntimeMeta.isSourceExternal(regionId); + } + + private static class MemoryEstimation { + private final long nonFloatingMemoryInBytes; + private final int dataRegionTaskCount; + + private MemoryEstimation(final long nonFloatingMemoryInBytes, final int dataRegionTaskCount) { + this.nonFloatingMemoryInBytes = nonFloatingMemoryInBytes; + this.dataRegionTaskCount = dataRegionTaskCount; + } + } + private boolean isInnerSource(final PipeParameters sourceParameters) { final String pluginName = sourceParameters @@ -831,7 +963,11 @@ private boolean isInnerSource(final PipeParameters sourceParameters) { || pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName()); } - private void calculateInsertNodeQueueMemory(final PipeParameters sourceParameters) { + private void calculateInsertNodeQueueMemory( + final PipeParameters sourceParameters, final int dataRegionTaskCount) { + if (dataRegionTaskCount <= 0) { + return; + } // Realtime source is enabled by default, so we only need to check the source realtime if (!sourceParameters.getBooleanOrDefault( @@ -850,17 +986,17 @@ private void calculateInsertNodeQueueMemory(final PipeParameters sourceParameter return; } + final long needFloatingMemory = + PipeConfig.getInstance().getPipeInsertNodeQueueMemory() * dataRegionTaskCount; final long allocatedMemorySizeInBytes = this.getAllFloatingMemoryUsageInByte(); final long remainingMemory = PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes() - allocatedMemorySizeInBytes; - if (remainingMemory < PipeConfig.getInstance().getPipeInsertNodeQueueMemory()) { + if (remainingMemory < needFloatingMemory) { final String message = String.format( "%s Need Floating memory: %d bytes, free Floating memory: %d bytes", - PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE, - PipeConfig.getInstance().getPipeInsertNodeQueueMemory(), - remainingMemory); + PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE, needFloatingMemory, remainingMemory); LOGGER.warn(message); throw new PipeException(message); } @@ -939,32 +1075,108 @@ private long calculateTsFileParserMemory( } private long calculateSinkBatchMemory(final PipeParameters sinkParameters) { + final String format = + sinkParameters.getStringOrDefault( + Arrays.asList(PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), + PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE); + final boolean usingTsFileBatch = PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals(format); - // If the sink format is tsfile , we need to use batch - boolean needUseBatch = - PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals( - sinkParameters.getStringOrDefault( + // TsFile format always uses a batch. Other formats only use a batch when batch mode is enabled. + final boolean needUseBatch = + usingTsFileBatch + || sinkParameters.getBooleanOrDefault( Arrays.asList( - PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), - PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE)); + PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, + PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY), + PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE); - if (needUseBatch) { - return PipeConfig.getInstance().getSinkBatchMemoryTsFile(); + if (!needUseBatch) { + return 0; } - // If the sink is batch mode, we need to use batch - needUseBatch = - sinkParameters.getBooleanOrDefault( + final long batchSizeInBytes = + sinkParameters.getLongOrDefault( Arrays.asList( - PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, - PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY), - PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE); + PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, + PipeSinkConstant.SINK_IOTDB_BATCH_SIZE_KEY), + usingTsFileBatch + ? PipeSinkConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE + : PipeSinkConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE); - if (!needUseBatch) { - return 0; + return batchSizeInBytes * calculateBatchShardCount(sinkParameters, usingTsFileBatch); + } + + private long calculateBatchShardCount( + final PipeParameters sinkParameters, final boolean usingTsFileBatch) { + if (usingTsFileBatch + || !sinkParameters.getBooleanOrDefault( + Arrays.asList( + PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY, + PipeSinkConstant.SINK_LEADER_CACHE_ENABLE_KEY), + PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE)) { + return 1; } - return PipeConfig.getInstance().getSinkBatchMemoryInsertNode(); + // Plain batches always allocate the default batch and may lazily allocate one batch per target + // endpoint when leader cache splits events by endpoint. + return 1L + calculateTargetEndPointCount(sinkParameters); + } + + private int calculateTargetEndPointCount(final PipeParameters sinkParameters) { + final Set targetEndPoints = new HashSet<>(); + try { + addTargetEndPoint( + targetEndPoints, + sinkParameters, + PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_HOST_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY); + addTargetEndPoint( + targetEndPoints, + sinkParameters, + PipeSinkConstant.SINK_IOTDB_IP_KEY, + PipeSinkConstant.SINK_IOTDB_HOST_KEY, + PipeSinkConstant.SINK_IOTDB_PORT_KEY); + if (sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY)) { + targetEndPoints.addAll( + NodeUrlUtils.parseTEndPointUrls( + Arrays.asList( + sinkParameters + .getStringByKeys(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY) + .replace(" ", "") + .split(",")))); + } + if (sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_NODE_URLS_KEY)) { + targetEndPoints.addAll( + NodeUrlUtils.parseTEndPointUrls( + Arrays.asList( + sinkParameters + .getStringByKeys(PipeSinkConstant.SINK_IOTDB_NODE_URLS_KEY) + .replace(" ", "") + .split(",")))); + } + } catch (final Exception ignored) { + return 1; + } + return Math.max(1, targetEndPoints.size()); + } + + private void addTargetEndPoint( + final Set targetEndPoints, + final PipeParameters sinkParameters, + final String ipKey, + final String hostKey, + final String portKey) { + if (sinkParameters.hasAttribute(ipKey) && sinkParameters.hasAttribute(portKey)) { + targetEndPoints.add( + new TEndPoint( + sinkParameters.getStringByKeys(ipKey), sinkParameters.getIntByKeys(portKey))); + } + if (sinkParameters.hasAttribute(hostKey) && sinkParameters.hasAttribute(portKey)) { + targetEndPoints.add( + new TEndPoint( + sinkParameters.getStringByKeys(hostKey), sinkParameters.getIntByKeys(portKey))); + } } private long calculateSendTsFileReadBufferMemory( @@ -993,7 +1205,7 @@ private long calculateSendTsFileReadBufferMemory( return 0; } - return PipeConfig.getInstance().getSendTsFileReadBuffer(); + return PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); } private long calculateAssignerMemory(final PipeParameters sourceParameters) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java index 8747c12d076b6..878823d4696cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java @@ -67,14 +67,11 @@ public class PipeDataNodeTaskBuilder { private static final PipeProcessorSubtaskExecutor PROCESSOR_EXECUTOR = PipeSubtaskExecutorManager.getInstance().getProcessorExecutor(); - protected final Map systemParameters = new HashMap<>(); - public PipeDataNodeTaskBuilder( final PipeStaticMeta pipeStaticMeta, final int regionId, final PipeTaskMeta pipeTaskMeta) { this.pipeStaticMeta = pipeStaticMeta; this.regionId = regionId; this.pipeTaskMeta = pipeTaskMeta; - generateSystemParameters(); } public PipeDataNodeTask build() { @@ -82,11 +79,10 @@ public PipeDataNodeTask build() { // Analyzes the PipeParameters to identify potential conflicts. final PipeParameters sourceParameters = - blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters()); + blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters(), pipeTaskMeta); final PipeParameters sinkParameters = - blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters()); - checkConflict(sourceParameters, sinkParameters); - injectParameters(sourceParameters, sinkParameters); + blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters(), pipeTaskMeta); + preprocessParameters(sourceParameters, sinkParameters); // We first build the source and sink, then build the processor. final PipeTaskSourceStage sourceStage = @@ -125,7 +121,7 @@ public PipeDataNodeTask build() { new PipeTaskProcessorStage( pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime(), - blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()), + blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters(), pipeTaskMeta), regionId, sourceStage.getEventSupplier(), sinkStage.getPipeSinkPendingQueue(), @@ -143,22 +139,25 @@ public PipeDataNodeTask build() { pipeStaticMeta.getPipeName(), regionId, sourceStage, processorStage, sinkStage); } - private void generateSystemParameters() { + public static PipeParameters blendUserAndSystemParameters( + final PipeParameters userParameters, final PipeTaskMeta pipeTaskMeta) { + // Deep copy the user parameters to avoid modification of the original parameters. + // If the original parameters are modified, progress index report will be affected. + final Map blendedParameters = new HashMap<>(userParameters.getAttribute()); if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) || pipeTaskMeta.isNewlyAdded()) { - systemParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, Boolean.TRUE.toString()); + blendedParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, Boolean.TRUE.toString()); } + return new PipeParameters(blendedParameters); } - private PipeParameters blendUserAndSystemParameters(final PipeParameters userParameters) { - // Deep copy the user parameters to avoid modification of the original parameters. - // If the original parameters are modified, progress index report will be affected. - final Map blendedParameters = new HashMap<>(userParameters.getAttribute()); - blendedParameters.putAll(systemParameters); - return new PipeParameters(blendedParameters); + public static void preprocessParameters( + final PipeParameters sourceParameters, final PipeParameters sinkParameters) { + checkConflict(sourceParameters, sinkParameters); + injectParameters(sourceParameters, sinkParameters); } - private void checkConflict( + private static void checkConflict( final PipeParameters sourceParameters, final PipeParameters sinkParameters) { final Pair insertionDeletionListeningOptionPair; final boolean shouldTerminatePipeOnAllHistoricalEventsConsumed; @@ -228,7 +227,7 @@ private void checkConflict( } } - private void injectParameters( + private static void injectParameters( final PipeParameters sourceParameters, final PipeParameters sinkParameters) { final boolean isSourceExternal = !BuiltinPipePlugin.BUILTIN_SOURCES.contains( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index cca6de44f0bad..33c89aa896760 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -69,13 +69,7 @@ public synchronized String register( final Supplier executorSupplier, final PipeParameters pipeSinkParameters, final PipeTaskSinkRuntimeEnvironment environment) { - final String connectorName = - PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters); - final String connectorKey = - connectorName - // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase - // for matching in `CONNECTOR_CONSTRUCTORS` - .toLowerCase(); + final String connectorKey = getConnectorKey(pipeSinkParameters); PipeEventCommitManager.getInstance() .register( environment.getPipeName(), @@ -83,48 +77,23 @@ public synchronized String register( environment.getRegionId(), connectorKey); - final boolean isDataRegionSink = - StorageEngine.getInstance() - .getAllDataRegionIds() - .contains(new DataRegionId(environment.getRegionId())) - || PipeRuntimeMeta.isSourceExternal(environment.getRegionId()); - - final int sinkNum; + final boolean isDataRegionSink = isDataRegionSink(environment.getRegionId()); + final int sinkNum = calculateSinkSubtaskNum(pipeSinkParameters, isDataRegionSink, connectorKey); boolean realTimeFirst = false; - boolean serializeByRegion = false; - String attributeSortedString = generateAttributeSortedString(pipeSinkParameters); + final String attributeSortedString = + generateAttributeSortedString(pipeSinkParameters, environment.getRegionId()); final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters); if (isDataRegionSink) { - serializeByRegion = PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters); - sinkNum = - serializeByRegion - ? 1 - : pipeSinkParameters.getIntOrDefault( - Arrays.asList( - PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, - PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY), - PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey) - ? 1 - : PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE); realTimeFirst = pipeSinkParameters.getBooleanOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, PipeSinkConstant.SINK_REALTIME_FIRST_KEY), PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE); - attributeSortedString = - serializeByRegion - ? "data_region_" + environment.getRegionId() + "_" + attributeSortedString - : "data_" + attributeSortedString; - } else { - // Do not allow parallel tasks for schema region connectors - // to avoid the potential disorder of the schema region data transfer - sinkNum = 1; - attributeSortedString = "schema_" + attributeSortedString; } final String attributeDisplayStringWithPrefix = isDataRegionSink - ? serializeByRegion + ? PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters) ? "data_region_" + environment.getRegionId() + "_" + attributeDisplayString : "data_" + attributeDisplayString : "schema_" + attributeDisplayString; @@ -285,6 +254,59 @@ public UnboundedBlockingPendingQueue getPipeSinkPendingQueue( .getPendingQueue(); } + public synchronized boolean hasRegisteredSubtasks( + final PipeParameters pipeSinkParameters, final int regionId) { + return attributeSortedString2SubtaskLifeCycleMap.containsKey( + generateAttributeSortedString(pipeSinkParameters, regionId)); + } + + public static int calculateSinkSubtaskNum( + final PipeParameters pipeSinkParameters, final int regionId) { + final String connectorKey = getConnectorKey(pipeSinkParameters); + return calculateSinkSubtaskNum(pipeSinkParameters, isDataRegionSink(regionId), connectorKey); + } + + public static String generateAttributeSortedString( + final PipeParameters pipeSinkParameters, final int regionId) { + final String attributeSortedString = generateAttributeSortedString(pipeSinkParameters); + if (isDataRegionSink(regionId)) { + return PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters) + ? "data_region_" + regionId + "_" + attributeSortedString + : "data_" + attributeSortedString; + } + return "schema_" + attributeSortedString; + } + + private static String getConnectorKey(final PipeParameters pipeSinkParameters) { + return PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters).toLowerCase(); + } + + private static boolean isDataRegionSink(final int regionId) { + return StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) + || PipeRuntimeMeta.isSourceExternal(regionId); + } + + private static int calculateSinkSubtaskNum( + final PipeParameters pipeSinkParameters, + final boolean isDataRegionSink, + final String connectorKey) { + if (!isDataRegionSink) { + // Do not allow parallel tasks for schema region connectors to avoid the potential disorder of + // the schema region data transfer. + return 1; + } + if (PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)) { + return 1; + } + return pipeSinkParameters.getIntOrDefault( + Arrays.asList( + PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, + PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY), + PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey) + ? 1 + : PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE); + } + private static String generateAttributeSortedString( final PipeParameters pipeConnectorParameters) { final TreeMap sortedStringSourceMap = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index c30a50ac4958a..48307d3b96f75 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -508,18 +508,15 @@ protected boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws Ille final String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName(); final long creationTime = pipeMetaFromCoordinator.getStaticMeta().getCreationTime(); - calculateMemoryUsage( - pipeMetaFromCoordinator.getStaticMeta(), - pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(), - pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(), - pipeMetaFromCoordinator.getStaticMeta().getSinkParameters()); - final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); - if (existedPipeMeta != null) { - if (!checkBeforeCreatePipe(existedPipeMeta, pipeName, creationTime)) { - return false; - } + if (existedPipeMeta != null + && !checkBeforeCreatePipe(existedPipeMeta, pipeName, creationTime)) { + return false; + } + calculateMemoryUsage(pipeMetaFromCoordinator); + + if (existedPipeMeta != null) { // Drop the pipe if // 1. The pipe with the same name but with different creation time has been created before // 2. The pipe with the same name and the same creation time has been dropped before, but the @@ -552,6 +549,15 @@ protected boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws Ille return needToStartPipe; } + protected void calculateMemoryUsage(final PipeMeta pipeMetaFromCoordinator) + throws IllegalPathException { + calculateMemoryUsage( + pipeMetaFromCoordinator.getStaticMeta(), + pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(), + pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(), + pipeMetaFromCoordinator.getStaticMeta().getSinkParameters()); + } + protected void calculateMemoryUsage( final PipeStaticMeta staticMeta, final PipeParameters extractorParameters, From f13df43f8833d8418ade6cb2a612493df2e59dcb Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 29 Jun 2026 12:10:42 +0800 Subject: [PATCH 2/3] Pipe: add memory estimation unit tests --- .../agent/task/PipeDataNodeTaskAgent.java | 18 +- .../agent/task/PipeDataNodeTaskAgentTest.java | 159 ++++++++++++++++++ .../builder/PipeDataNodeTaskBuilderTest.java | 111 ++++++++++++ .../sink/PipeSinkSubtaskManagerTest.java | 95 +++++++++++ 4 files changed, 376 insertions(+), 7 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgentTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilderTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManagerTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 43b76f6917676..f2a9ddc854550 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -815,8 +815,12 @@ private void calculateMemoryUsage( throws IllegalPathException { if (!PipeConfig.getInstance().isPipeEnableMemoryCheck() || !isInnerSource(staticMeta.getSourceParameters()) - || !PipeType.USER.equals(staticMeta.getPipeType()) - || pipeTasksToBeCreated.isEmpty()) { + || !PipeType.USER.equals(staticMeta.getPipeType())) { + return; + } + + if (pipeTasksToBeCreated.isEmpty()) { + calculateInsertNodeQueueMemory(staticMeta.getSourceParameters(), 1); return; } @@ -1074,7 +1078,7 @@ private long calculateTsFileParserMemory( return PipeConfig.getInstance().getTsFileParserMemory(); } - private long calculateSinkBatchMemory(final PipeParameters sinkParameters) { + private static long calculateSinkBatchMemory(final PipeParameters sinkParameters) { final String format = sinkParameters.getStringOrDefault( Arrays.asList(PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), @@ -1106,7 +1110,7 @@ private long calculateSinkBatchMemory(final PipeParameters sinkParameters) { return batchSizeInBytes * calculateBatchShardCount(sinkParameters, usingTsFileBatch); } - private long calculateBatchShardCount( + private static long calculateBatchShardCount( final PipeParameters sinkParameters, final boolean usingTsFileBatch) { if (usingTsFileBatch || !sinkParameters.getBooleanOrDefault( @@ -1122,7 +1126,7 @@ private long calculateBatchShardCount( return 1L + calculateTargetEndPointCount(sinkParameters); } - private int calculateTargetEndPointCount(final PipeParameters sinkParameters) { + private static int calculateTargetEndPointCount(final PipeParameters sinkParameters) { final Set targetEndPoints = new HashSet<>(); try { addTargetEndPoint( @@ -1161,7 +1165,7 @@ private int calculateTargetEndPointCount(final PipeParameters sinkParameters) { return Math.max(1, targetEndPoints.size()); } - private void addTargetEndPoint( + private static void addTargetEndPoint( final Set targetEndPoints, final PipeParameters sinkParameters, final String ipKey, @@ -1179,7 +1183,7 @@ private void addTargetEndPoint( } } - private long calculateSendTsFileReadBufferMemory( + private static long calculateSendTsFileReadBufferMemory( final PipeParameters sourceParameters, final PipeParameters sinkParameters) { // If the source is history enable, we need to transfer tsfile boolean needTransferTsFile = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgentTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgentTest.java new file mode 100644 index 0000000000000..0582a94170ddf --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgentTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task; + +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +public class PipeDataNodeTaskAgentTest { + + @Test + public void testCreateMemoryCheckStillRunsWhenNoPipeTasksNeedToBeCreated() throws Exception { + final boolean originalPipeEnableMemoryCheck = + CommonDescriptor.getInstance().getConfig().isPipeEnableMemoryChecked(); + final long originalPipeInsertNodeQueueMemory = + CommonDescriptor.getInstance().getConfig().getPipeInsertNodeQueueMemory(); + final double originalPipeTotalFloatingMemoryProportion = + CommonDescriptor.getInstance().getConfig().getPipeTotalFloatingMemoryProportion(); + + try { + CommonDescriptor.getInstance().getConfig().setIsPipeEnableMemoryChecked(true); + CommonDescriptor.getInstance().getConfig().setPipeInsertNodeQueueMemory(1); + CommonDescriptor.getInstance().getConfig().setPipeTotalFloatingMemoryProportion(0); + + Assert.assertThrows( + PipeException.class, + () -> + PipeDataNodeAgent.task() + .calculateMemoryUsage( + new PipeMeta( + new PipeStaticMeta( + "p", 1L, new HashMap<>(), new HashMap<>(), new HashMap<>()), + new PipeRuntimeMeta()))); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setIsPipeEnableMemoryChecked(originalPipeEnableMemoryCheck); + CommonDescriptor.getInstance() + .getConfig() + .setPipeInsertNodeQueueMemory(originalPipeInsertNodeQueueMemory); + CommonDescriptor.getInstance() + .getConfig() + .setPipeTotalFloatingMemoryProportion(originalPipeTotalFloatingMemoryProportion); + } + } + + @Test + public void testPlainBatchMemoryIncludesLeaderCacheEndpointShards() throws Exception { + final Map sinkAttributes = new HashMap<>(); + sinkAttributes.put( + PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE); + sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1024"); + sinkAttributes.put( + PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667, 127.0.0.2:6667"); + sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.3"); + sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY, "6667"); + + Assert.assertEquals( + 4 * 1024L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes))); + + sinkAttributes.put( + PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY, Boolean.FALSE.toString()); + Assert.assertEquals(1024L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes))); + } + + @Test + public void testTsFileBatchMemoryIgnoresLeaderCacheEndpointShards() throws Exception { + final Map sinkAttributes = new HashMap<>(); + sinkAttributes.put( + PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE); + sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "2048"); + sinkAttributes.put( + PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667,127.0.0.2:6667"); + + Assert.assertEquals(2048L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes))); + } + + @Test + public void testPlainBatchMemoryReturnsZeroWhenBatchModeIsDisabled() throws Exception { + final Map sinkAttributes = new HashMap<>(); + sinkAttributes.put( + PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE); + sinkAttributes.put( + PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, Boolean.FALSE.toString()); + sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1024"); + + Assert.assertEquals(0L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes))); + } + + @Test + public void testSendTsFileReadBufferMemoryUsesSinkReadFileBufferSize() throws Exception { + final Map sourceAttributes = new HashMap<>(); + sourceAttributes.put(PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, Boolean.FALSE.toString()); + + final Map sinkAttributes = new HashMap<>(); + sinkAttributes.put( + PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE); + Assert.assertEquals( + 0L, + invokeCalculateSendTsFileReadBufferMemory( + new PipeParameters(sourceAttributes), new PipeParameters(sinkAttributes))); + + sinkAttributes.put( + PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE); + Assert.assertEquals( + PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + invokeCalculateSendTsFileReadBufferMemory( + new PipeParameters(sourceAttributes), new PipeParameters(sinkAttributes))); + } + + private long invokeCalculateSinkBatchMemory(final PipeParameters sinkParameters) + throws Exception { + final Method method = + PipeDataNodeTaskAgent.class.getDeclaredMethod( + "calculateSinkBatchMemory", PipeParameters.class); + method.setAccessible(true); + return (long) method.invoke(null, sinkParameters); + } + + private long invokeCalculateSendTsFileReadBufferMemory( + final PipeParameters sourceParameters, final PipeParameters sinkParameters) throws Exception { + final Method method = + PipeDataNodeTaskAgent.class.getDeclaredMethod( + "calculateSendTsFileReadBufferMemory", PipeParameters.class, PipeParameters.class); + method.setAccessible(true); + return (long) method.invoke(null, sourceParameters, sinkParameters); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilderTest.java new file mode 100644 index 0000000000000..e00f9500bbef7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilderTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task.builder; + +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class PipeDataNodeTaskBuilderTest { + + @Test + public void testBlendUserAndSystemParametersDoesNotMutateOriginal() { + final Map attributes = new HashMap<>(); + attributes.put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, "root.sg.**"); + final PipeParameters userParameters = new PipeParameters(attributes); + + final PipeParameters blendedParameters = + PipeDataNodeTaskBuilder.blendUserAndSystemParameters( + userParameters, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)); + + Assert.assertEquals( + "root.sg.**", blendedParameters.getStringByKeys(PipeSourceConstant.EXTRACTOR_PATTERN_KEY)); + Assert.assertFalse(blendedParameters.hasAttribute(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY)); + Assert.assertFalse(userParameters.hasAttribute(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY)); + } + + @Test + public void testBlendUserAndSystemParametersMarksRestartOrNewlyAddedTask() { + final PipeParameters restartedParameters = + PipeDataNodeTaskBuilder.blendUserAndSystemParameters( + new PipeParameters(new HashMap<>()), + new PipeTaskMeta(new SimpleProgressIndex(1, 2L), 1)); + + Assert.assertEquals( + Boolean.TRUE.toString(), + restartedParameters.getStringByKeys(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY)); + + final PipeTaskMeta newlyAddedTaskMeta = + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1).markAsNewlyAdded(); + final PipeParameters newlyAddedParameters = + PipeDataNodeTaskBuilder.blendUserAndSystemParameters( + new PipeParameters(new HashMap<>()), newlyAddedTaskMeta); + + Assert.assertEquals( + Boolean.TRUE.toString(), + newlyAddedParameters.getStringByKeys(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY)); + } + + @Test + public void testPreprocessParametersInjectsRuntimeDefaults() { + final Map sourceAttributes = new HashMap<>(); + sourceAttributes.put(PipeSourceConstant.EXTRACTOR_INCLUSION_KEY, "data.delete"); + final PipeParameters sourceParameters = new PipeParameters(sourceAttributes); + final PipeParameters sinkParameters = new PipeParameters(new HashMap<>()); + + PipeDataNodeTaskBuilder.preprocessParameters(sourceParameters, sinkParameters); + + Assert.assertEquals( + Boolean.FALSE.toString(), + sinkParameters.getStringByKeys(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY)); + Assert.assertEquals( + Boolean.TRUE.toString(), + sinkParameters.getStringByKeys(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT)); + } + + @Test + public void testPreprocessParametersInjectsEventUserForExternalWriteBackSink() { + final Map sourceAttributes = new HashMap<>(); + sourceAttributes.put(PipeSourceConstant.EXTRACTOR_KEY, "external-source"); + + final Map sinkAttributes = new HashMap<>(); + sinkAttributes.put( + PipeSinkConstant.CONNECTOR_KEY, BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName()); + final PipeParameters sinkParameters = new PipeParameters(sinkAttributes); + + PipeDataNodeTaskBuilder.preprocessParameters( + new PipeParameters(sourceAttributes), sinkParameters); + + Assert.assertEquals( + Boolean.TRUE.toString(), + sinkParameters.getStringByKeys(PipeSinkConstant.CONNECTOR_USE_EVENT_USER_NAME_KEY)); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManagerTest.java new file mode 100644 index 0000000000000..9545d71d96569 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManagerTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task.subtask.sink; + +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class PipeSinkSubtaskManagerTest { + + @Test + public void testGenerateAttributeSortedStringAddsRegionPrefixAndIgnoresRestartFlag() { + final Map attributes = new HashMap<>(); + attributes.put("z", "1"); + attributes.put("a", "2"); + attributes.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, Boolean.TRUE.toString()); + + Assert.assertEquals( + "data_region_-1_{a=2, z=1}", + PipeSinkSubtaskManager.generateAttributeSortedString( + new PipeParameters(new HashMap<>(attributes)), -1)); + + attributes.put(PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, Boolean.FALSE.toString()); + Assert.assertEquals( + "data_{a=2, connector.serialize-by-region=false, z=1}", + PipeSinkSubtaskManager.generateAttributeSortedString( + new PipeParameters(new HashMap<>(attributes)), -1)); + } + + @Test + public void testCalculateSinkSubtaskNumForDataRegionSink() { + final Map parallelAttributes = new HashMap<>(); + parallelAttributes.put( + PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, Boolean.FALSE.toString()); + parallelAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, "3"); + Assert.assertEquals( + 3, + PipeSinkSubtaskManager.calculateSinkSubtaskNum(new PipeParameters(parallelAttributes), -1)); + + final Map serializedAttributes = new HashMap<>(); + serializedAttributes.put( + PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, Boolean.TRUE.toString()); + serializedAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, "3"); + Assert.assertEquals( + 1, + PipeSinkSubtaskManager.calculateSinkSubtaskNum( + new PipeParameters(serializedAttributes), -1)); + } + + @Test + public void testCalculateSinkSubtaskNumUsesSingleThreadDefaultSinkAndSchemaRegionLimit() { + final Map singleThreadAttributes = new HashMap<>(); + singleThreadAttributes.put( + PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, Boolean.FALSE.toString()); + singleThreadAttributes.put( + PipeSinkConstant.CONNECTOR_KEY, BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName()); + Assert.assertEquals( + 1, + PipeSinkSubtaskManager.calculateSinkSubtaskNum( + new PipeParameters(singleThreadAttributes), -1)); + + final Map schemaRegionAttributes = new HashMap<>(); + schemaRegionAttributes.put( + PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, Boolean.FALSE.toString()); + schemaRegionAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, "3"); + Assert.assertEquals( + 1, + PipeSinkSubtaskManager.calculateSinkSubtaskNum( + new PipeParameters(schemaRegionAttributes), Integer.MAX_VALUE)); + } +} From 82785393f2077191683c2d258a442c5e7408c9de Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 1 Jul 2026 11:01:11 +0800 Subject: [PATCH 3/3] Pipe: localize memory check messages --- .../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 8 ++++---- .../en/org/apache/iotdb/commons/i18n/PipeMessages.java | 7 +++++++ .../zh/org/apache/iotdb/commons/i18n/PipeMessages.java | 6 ++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index f2a9ddc854550..b28fdf22d98d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -841,8 +841,7 @@ private void calculateMemoryUsage( if (freeMemorySizeInBytes < needMemory + reservedMemorySizeInBytes) { final String message = String.format( - "%s Need memory: %d bytes, free memory: %d bytes, reserved memory: %d bytes, total memory: %d bytes", - PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE, + PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE_FORMAT, needMemory, freeMemorySizeInBytes, reservedMemorySizeInBytes, @@ -999,8 +998,9 @@ private void calculateInsertNodeQueueMemory( if (remainingMemory < needFloatingMemory) { final String message = String.format( - "%s Need Floating memory: %d bytes, free Floating memory: %d bytes", - PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE, needFloatingMemory, remainingMemory); + PipeMessages.NOT_ENOUGH_FLOATING_MEMORY_FOR_PIPE_FORMAT, + needFloatingMemory, + remainingMemory); LOGGER.warn(message); throw new PipeException(message); } diff --git a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java index f0086f2689855..171d3689f0fca 100644 --- a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java @@ -256,6 +256,13 @@ public final class PipeMessages { // ===================== PipeTaskAgent ===================== public static final String NOT_ENOUGH_MEMORY_FOR_PIPE = "Not enough memory for pipe."; + public static final String NOT_ENOUGH_MEMORY_FOR_PIPE_FORMAT = + NOT_ENOUGH_MEMORY_FOR_PIPE + + " Need memory: %d bytes, free memory: %d bytes, " + + "reserved memory: %d bytes, total memory: %d bytes"; + public static final String NOT_ENOUGH_FLOATING_MEMORY_FOR_PIPE_FORMAT = + NOT_ENOUGH_MEMORY_FOR_PIPE + + " Need floating memory: %d bytes, free floating memory: %d bytes"; public static final String UNKNOWN_PIPE_STATUS = "Unknown pipe status %s for pipe %s"; public static final String UNEXPECTED_PIPE_STATUS = "Unexpected pipe status %s: "; public static final String INTERRUPTED_ACQUIRING_READ_LOCK = diff --git a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java index d081a8fd4cf89..1ac0e021bfad0 100644 --- a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java @@ -255,6 +255,12 @@ public final class PipeMessages { // ===================== PipeTaskAgent ===================== public static final String NOT_ENOUGH_MEMORY_FOR_PIPE = "pipe 内存不足。"; + public static final String NOT_ENOUGH_MEMORY_FOR_PIPE_FORMAT = + NOT_ENOUGH_MEMORY_FOR_PIPE + + "需要内存:%d 字节,空闲内存:%d 字节,保留内存:%d 字节," + + "总内存:%d 字节"; + public static final String NOT_ENOUGH_FLOATING_MEMORY_FOR_PIPE_FORMAT = + NOT_ENOUGH_MEMORY_FOR_PIPE + "需要浮动内存:%d 字节,空闲浮动内存:%d 字节"; public static final String UNKNOWN_PIPE_STATUS = "Pipe %s 的状态 %s 未知"; public static final String UNEXPECTED_PIPE_STATUS = "意外的 pipe 状态 %s:"; public static final String INTERRUPTED_ACQUIRING_READ_LOCK =