From 0997a3bb3a541aaca5486432883c22c8e473cb94 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 26 Mar 2026 12:31:21 +0800 Subject: [PATCH 1/3] co-1 --- .../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index ccf16bfa7534..cb4bbe47c982 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -163,10 +163,11 @@ public void onFailure(final Throwable throwable) { * @return {@code true} if the {@link PipeSubtask} should be stopped, {@code false} otherwise */ private boolean onPipeConnectionException(final Throwable throwable) { - LOGGER.warn( - "PipeConnectionException occurred, {} retries to handshake with the target system.", - outputPipeSink.getClass().getName(), - throwable); + PipeLogger.log( + LOGGER::warn, + throwable, + "PipeConnectionException occurred, %s retries to handshake with the target system.", + outputPipeSink.getClass().getName()); int retry = 0; while (retry < MAX_RETRY_TIMES) { From 4570e8f168cf685197ce6493956db17c94952ad0 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:46:46 +0800 Subject: [PATCH 2/3] by --- .../dataregion/DataExecutionVisitor.java | 14 +++++++------- .../plan/scheduler/AsyncPlanNodeSender.java | 14 ++++++++------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java index ca8136584679..e0184b8595d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java @@ -96,21 +96,21 @@ public TSStatus visitRelationalInsertTablet( } @Override - public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion dataRegion) { + public TSStatus visitInsertTablet(final InsertTabletNode node, final DataRegion dataRegion) { try { dataRegion.insertTablet(node); dataRegion.insertSeparatorToWAL(); return StatusUtils.OK; - } catch (OutOfTTLException e) { - LOGGER.warn("Error in executing plan node: {}, caused by {}", node, e.getMessage()); + } catch (final OutOfTTLException e) { + LOGGER.debug("Error in executing plan node: {}, caused by {}", node, e.getMessage()); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); - } catch (WriteProcessRejectException e) { + } catch (final WriteProcessRejectException e) { LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage()); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); - } catch (WriteProcessException e) { + } catch (final WriteProcessException e) { LOGGER.error("Error in executing plan node: {}", node, e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); - } catch (BatchProcessException e) { + } catch (final BatchProcessException e) { LOGGER.warn( "Batch failure in executing a InsertTabletNode. device: {}, startTime: {}, measurements: {}, failing status: {}", node.getTargetPath(), @@ -119,7 +119,7 @@ public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion dataRegion) e.getFailingStatus()); // For each error TSStatus firstStatus = null; - for (TSStatus status : e.getFailingStatus()) { + for (final TSStatus status : e.getFailingStatus()) { if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { firstStatus = status; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java index fc1e3d049d91..1bab463050eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java @@ -128,12 +128,14 @@ public List getFailedInstancesWithStatuses() { RpcUtils.getStatus( TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage()))); } else { - LOGGER.warn( - "dispatch write failed. status: {}, code: {}, message: {}, node {}", - entry.getValue().status, - TSStatusCode.representOf(status.code), - entry.getValue().message, - instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint()); + if (status.code != TSStatusCode.OUT_OF_TTL.getStatusCode()) { + LOGGER.warn( + "dispatch write failed. status: {}, code: {}, message: {}, node {}", + entry.getValue().status, + TSStatusCode.representOf(status.code), + entry.getValue().message, + instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint()); + } failureFragmentInstanceWithStatusList.add( new FailedFragmentInstanceWithStatus(instance, status)); } From 2cc58e7124fa4296061a8674178b76c5c7c28e7c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:53:03 +0800 Subject: [PATCH 3/3] fix --- .../thrift/IoTDBDataNodeReceiver.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 867e2487a508..254928e91910 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -830,16 +830,19 @@ private TSStatus executeStatementAndClassifyExceptions( final TSStatus result = executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(statement); - if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + final int code = result.getCode(); + if (code == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || code == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { return result; } else { - PipeLogger.log( - LOGGER::warn, - "Receiver id = %s: Failure status encountered while executing statement %s: %s", - receiverId.get(), - statement.getPipeLoggingString(), - result); + if (code != TSStatusCode.OUT_OF_TTL.getStatusCode()) { + PipeLogger.log( + LOGGER::warn, + "Receiver id = %s: Failure status encountered while executing statement %s: %s", + receiverId.get(), + statement.getPipeLoggingString(), + result); + } return STATEMENT_STATUS_VISITOR.process(statement, result); } } catch (final Exception e) {