From 0a13d70c743395ab4a8143226d54ddde3b649c68 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Wed, 25 Mar 2026 17:15:02 +0800 Subject: [PATCH 1/4] Fix kill query doesn't take effect bug --- .../thrift/impl/ClientRPCServiceImpl.java | 25 ++++--- .../queryengine/common/MPPQueryContext.java | 11 ++- .../db/queryengine/plan/Coordinator.java | 3 +- .../plan/execution/IQueryExecution.java | 10 ++- .../plan/execution/QueryExecution.java | 69 ++++++++++--------- .../execution/config/ConfigExecution.java | 22 ++++-- .../operator/MergeSortOperatorTest.java | 6 +- 7 files changed, 86 insertions(+), 60 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index b5edafec160c2..d9913bf42c4c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -205,6 +205,7 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; +import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED; public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @@ -241,6 +242,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + private static final String NO_QUERY_EXECUTION_ERR_MSG = + "Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log."; + @FunctionalInterface public interface SelectResult { boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize) @@ -1147,15 +1151,18 @@ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) { finished = true; return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus()); } - TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); queryExecution = COORDINATOR.getQueryExecution(req.queryId); if (queryExecution == null) { - resp.setHasResultSet(false); - resp.setMoreData(false); - return resp; + TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode()); + noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG); + return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus); } + + TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); + + queryExecution.updateCurrentRpcStartTime(startTime); statementType = queryExecution.getStatementType(); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { @@ -1686,16 +1693,16 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus()); } - TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); - queryExecution = COORDINATOR.getQueryExecution(req.queryId); if (queryExecution == null) { - resp.setHasResultSet(false); - resp.setMoreData(true); - return resp; + TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode()); + noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG); + return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus); } + queryExecution.updateCurrentRpcStartTime(startTime); statementType = queryExecution.getStatementType(); + TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair pair = convertTsBlockByFetchSize(queryExecution, req.fetchSize); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index fc457ab9ce487..d99ab5edd8195 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; @@ -95,6 +96,7 @@ public class MPPQueryContext { private boolean userQuery = false; + @TestOnly public MPPQueryContext(QueryId queryId) { this.queryId = queryId; this.endPointBlackList = ConcurrentHashMap.newKeySet(); @@ -102,19 +104,14 @@ public MPPQueryContext(QueryId queryId) { new NotThreadSafeMemoryReservationManager(queryId, this.getClass().getName()); } - // TODO too many callers just pass a null SessionInfo which should be forbidden + @TestOnly public MPPQueryContext( String sql, QueryId queryId, SessionInfo session, TEndPoint localDataBlockEndpoint, TEndPoint localInternalEndpoint) { - this(queryId); - this.sql = sql; - this.session = session; - this.localDataBlockEndpoint = localDataBlockEndpoint; - this.localInternalEndpoint = localInternalEndpoint; - this.initResultNodeContext(); + this(sql, queryId, -1, session, localDataBlockEndpoint, localInternalEndpoint); } public MPPQueryContext( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index ed76b837e801d..d9e9e4477b9b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -267,12 +267,11 @@ public QueryId createQueryId() { public void cleanupQueryExecution( Long queryId, org.apache.thrift.TBase nativeApiRequest, Throwable t) { - IQueryExecution queryExecution = getQueryExecution(queryId); + IQueryExecution queryExecution = queryExecutionMap.remove(queryId); if (queryExecution != null) { try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) { LOGGER.debug("[CleanUpQuery]]"); queryExecution.stopAndCleanup(t); - queryExecutionMap.remove(queryId); if (queryExecution.isQuery() && queryExecution.isUserQuery()) { long costTime = queryExecution.getTotalExecutionTime(); // print slow query diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index 5cb2d4b449cd5..6726a37d59f21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -33,8 +33,6 @@ public interface IQueryExecution { void stop(Throwable t); - void stopAndCleanup(); - void stopAndCleanup(Throwable t); void cancel(); @@ -61,6 +59,14 @@ public interface IQueryExecution { void recordExecutionTime(long executionTime); + /** + * update current rpc start time, which is used to calculate rpc execution time and update total + * execution time + * + * @param startTime start time of current rpc, time unit is ns + */ + void updateCurrentRpcStartTime(long startTime); + /** * @return cost time in ns */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 0ae403452683a..6572a39b5ef92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; +import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle; @@ -104,9 +105,14 @@ public class QueryExecution implements IQueryExecution { private final AtomicBoolean stopped; - // cost time in ns + // cost time in ns of finished rpc private long totalExecutionTime = 0; + // -1 if previous rpc is finished and next client req hasn't come yet, unit is ns + // it will be updated in fetchResult rpc + // protected by synchronized(this) + private long startTimeOfCurrentRpc = System.nanoTime(); + private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET = QueryExecutionMetricSet.getInstance(); private static final QueryPlanCostMetricSet QUERY_PLAN_COST_METRIC_SET = @@ -129,16 +135,16 @@ public QueryExecution(IPlanner planner, MPPQueryContext context, ExecutorService if (!state.isDone()) { return; } - // TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be - // invoked + Throwable cause = null; if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) { LOGGER.debug("[ReleaseQueryResource] state is: {}", state); - Throwable cause = stateMachine.getFailureException(); + cause = stateMachine.getFailureException(); releaseResource(cause); } - this.stop(null); + this.stop(cause); + this.cleanUpCoordinatorContextMapIfNeeded(cause); } }); this.stopped = new AtomicBoolean(false); @@ -324,12 +330,6 @@ public void stop(Throwable t) { } } - // Stop the query and clean up all the resources this query occupied - public void stopAndCleanup() { - stop(null); - releaseResource(); - } - @Override public void cancel() { stateMachine.transitionToCanceled( @@ -338,27 +338,12 @@ public void cancel() { .setMessage(KilledByOthersException.MESSAGE)); } - /** Release the resources that current QueryExecution hold. */ - private void releaseResource() { - // close ResultHandle to unblock client's getResult request - // Actually, we should not close the ResultHandle when the QueryExecution is Finished. - // There are only two scenarios where the ResultHandle should be closed: - // 1. The client fetch all the result and the ResultHandle is finished. - // 2. The client's connection is closed that all owned QueryExecution should be cleaned up - // If the QueryExecution's state is abnormal, we should also abort the resultHandle without - // waiting it to be finished. - if (resultHandle != null) { - resultHandle.close(); - cleanUpResultHandle(); - } - } - private void cleanUpResultHandle() { // Result handle belongs to special fragment instance, so we need to deregister it alone // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream // FragmentInstanceId to register - if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) { + if (resultHandle instanceof SourceHandle) { TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId(); MPPDataExchangeService.getInstance() .getMPPDataExchangeManager() @@ -385,13 +370,26 @@ private void releaseResource(Throwable t) { // 2. The client's connection is closed that all owned QueryExecution should be cleaned up // If the QueryExecution's state is abnormal, we should also abort the resultHandle without // waiting it to be finished. - if (resultHandle != null) { + if (resultHandle != null && resultHandleCleanUp.compareAndSet(false, true)) { if (t != null) { resultHandle.abort(t); } else { resultHandle.close(); } cleanUpResultHandle(); + resultHandle = null; + } + } + + /** + * clear up Coordinator.queryExecutionMap by calling Coordinator.cleanupQueryExecution if the + * current rpc is finished. We need to make sure the cleanup logic is only called when client + * connection is not active, otherwise the finally code logic in ClientRPCServiceImpl will handle + * that + */ + private synchronized void cleanUpCoordinatorContextMapIfNeeded(Throwable t) { + if (startTimeOfCurrentRpc == -1) { + Coordinator.getInstance().cleanupQueryExecution(context.getLocalQueryId(), null, t); } } @@ -648,13 +646,22 @@ public long getStartExecutionTime() { } @Override - public void recordExecutionTime(long executionTime) { + public synchronized void recordExecutionTime(long executionTime) { totalExecutionTime += executionTime; + // recordExecutionTime is called after current rpc finished, so we need to set + // startTimeOfCurrentRpc to -1 + this.startTimeOfCurrentRpc = -1; + } + + @Override + public synchronized void updateCurrentRpcStartTime(long startTime) { + this.startTimeOfCurrentRpc = startTime; } @Override - public long getTotalExecutionTime() { - return totalExecutionTime; + public synchronized long getTotalExecutionTime() { + return totalExecutionTime + + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - startTimeOfCurrentRpc); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index a9d96961834c2..00478d0ab888a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -72,6 +72,12 @@ public class ConfigExecution implements IQueryExecution { private final StatementType statementType; private long totalExecutionTime; + // -1 if previous rpc is finished and next client req hasn't come yet, unit is ns + // it will be updated in fetchResult rpc + // currently, ConfigExecution will return result is just one call, so this field is not used. But + // we will keep it for future use when ConfigExecution may return result in multiple calls + private long startTimeOfCurrentRpc = System.nanoTime(); + public ConfigExecution( MPPQueryContext context, StatementType statementType, @@ -144,11 +150,6 @@ public void stop(Throwable t) { // do nothing } - @Override - public void stopAndCleanup() { - // do nothing - } - @Override public void stopAndCleanup(Throwable t) { // do nothing @@ -244,11 +245,20 @@ public long getStartExecutionTime() { @Override public void recordExecutionTime(long executionTime) { totalExecutionTime += executionTime; + // recordExecutionTime is called after current rpc finished, so we need to set + // startTimeOfCurrentRpc to -1 + this.startTimeOfCurrentRpc = -1; + } + + @Override + public void updateCurrentRpcStartTime(long startTime) { + this.startTimeOfCurrentRpc = startTime; } @Override public long getTotalExecutionTime() { - return totalExecutionTime; + return totalExecutionTime + + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - startTimeOfCurrentRpc); } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java index 604d3eb34420d..cf6f5a1ea7525 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java @@ -1798,6 +1798,9 @@ public long getStartExecutionTime() { @Override public void recordExecutionTime(long executionTime) {} + @Override + public void updateCurrentRpcStartTime(long startTime) {} + @Override public long getTotalExecutionTime() { return 0; @@ -1819,9 +1822,6 @@ public void start() {} @Override public void stop(Throwable t) {} - @Override - public void stopAndCleanup() {} - @Override public void stopAndCleanup(Throwable t) {} From 2d9d9d3bfcdbaa5de8e866eb64fa224798bffb06 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Thu, 26 Mar 2026 10:52:17 +0800 Subject: [PATCH 2/4] fix some ci --- .../queryengine/common/MPPQueryContext.java | 6 ++++ .../execution/QueryStateMachine.java | 4 +-- .../fragment/FragmentInstanceManager.java | 2 ++ .../db/queryengine/plan/Coordinator.java | 28 +++++++++++++++++-- .../plan/execution/IQueryExecution.java | 5 ++++ .../plan/execution/QueryExecution.java | 21 ++++++++++---- 6 files changed, 56 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 85c429d090b5f..7479e832a9016 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -52,7 +52,11 @@ public class MPPQueryContext { private long localQueryId; private SessionInfo session; private QueryType queryType = QueryType.READ; + + /** the max executing time of query in ms. Unit: millisecond */ private long timeOut; + + // time unit is ms private long startTime; private TEndPoint localDataBlockEndpoint; @@ -179,10 +183,12 @@ public QueryType getQueryType() { return queryType; } + /** the max executing time of query in ms. Unit: millisecond */ public long getTimeOut() { return timeOut; } + /** the max executing time of query in ms. Unit: millisecond */ public void setTimeOut(long timeOut) { this.timeOut = timeOut; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java index 146359bc215ed..19d0fb38749d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java @@ -107,10 +107,10 @@ public void transitionToCanceled() { transitionToDoneState(CANCELED); } - public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) { + public boolean transitionToCanceled(Throwable throwable, TSStatus failureStatus) { this.failureStatus.compareAndSet(null, failureStatus); this.failureException.compareAndSet(null, throwable); - transitionToDoneState(CANCELED); + return transitionToDoneState(CANCELED); } public void transitionToAborted() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 9a2657101e724..e8d0fd8243201 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; +import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; @@ -427,6 +428,7 @@ private void cancelTimeoutFlushingInstances() { + "ms, and now is in flushing state")); } }); + Coordinator.getInstance().cleanUpStaleQueries(); } public ExecutorService getIntoOperationExecutor() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index d9e9e4477b9b1..a636f858d670f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -241,7 +242,6 @@ public int getQueryExecutionMapSize() { return queryExecutionMap.size(); } - // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management private ExecutorService getQueryExecutor() { int coordinatorReadExecutorSize = CONFIG.getCoordinatorReadExecutorSize(); return IoTDBThreadPoolFactory.newFixedThreadPool( @@ -254,7 +254,6 @@ private ExecutorService getWriteExecutor() { coordinatorWriteExecutorSize, ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName()); } - // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management private ScheduledExecutorService getScheduledExecutor() { return IoTDBThreadPoolFactory.newScheduledThreadPool( COORDINATOR_SCHEDULED_EXECUTOR_SIZE, @@ -299,6 +298,31 @@ public void cleanupQueryExecution( } } + /** + * cWe need to reclaim resources from queries that have exceeded their timeout by more than one + * minute. This indicates that the associated clients have failed to perform proper resource + * cleanup. + */ + public void cleanUpStaleQueries() { + long currentTime = System.currentTimeMillis(); + queryExecutionMap.forEach( + (queryId, queryExecution) -> { + long timeout = queryExecution.getTimeout(); + long queryStartTime = queryExecution.getStartExecutionTime(); + long executeTime = currentTime - queryStartTime; + if (timeout > 0 && executeTime > timeout + 60_000L) { + LOGGER.warn( + "Cleaning up stale query with id {}, which has been running for {} ms", + queryId, + executeTime); + cleanupQueryExecution( + queryId, + null, + new QueryTimeoutRuntimeException(queryStartTime, currentTime, timeout)); + } + }); + } + public void cleanupQueryExecution(Long queryId) { cleanupQueryExecution(queryId, null, null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index cf0413a87191f..a27316fb00b91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -55,8 +55,12 @@ public interface IQueryExecution { String getQueryId(); + // time unit is ms long getStartExecutionTime(); + /** + * @param executionTime time unit should be ns + */ void recordExecutionTime(long executionTime); /** @@ -75,6 +79,7 @@ public interface IQueryExecution { /** return ip for a thrift-based client, client-id for MQTT/REST client */ String getClientHostname(); + /** the max executing time of query in ms. Unit: millisecond */ long getTimeout(); Optional getExecuteSQL(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 8f14467627b39..1fd53e708308c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -142,9 +142,10 @@ public QueryExecution(IPlanner planner, MPPQueryContext context, ExecutorService LOGGER.debug("[ReleaseQueryResource] state is: {}", state); cause = stateMachine.getFailureException(); releaseResource(cause); + + this.cleanUpCoordinatorContextMapIfNeeded(cause); } this.stop(cause); - this.cleanUpCoordinatorContextMapIfNeeded(cause); } }); this.stopped = new AtomicBoolean(false); @@ -332,10 +333,19 @@ public void stop(Throwable t) { @Override public void cancel() { - stateMachine.transitionToCanceled( - new KilledByOthersException(), - new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode()) - .setMessage(KilledByOthersException.MESSAGE)); + Throwable cause = new KilledByOthersException(); + boolean cancelled = + stateMachine.transitionToCanceled( + cause, + new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode()) + .setMessage(KilledByOthersException.MESSAGE)); + if (!cancelled) { + // cancel failed, means this query has already in a done state, we can do nothing to change + // the state but clean up the resource if needed + // we don't need to do cleanUpCoordinatorContextMapIfNeeded if cancel succeed, because it will + // be called in callback logic in QueryStateMachine of this QueryExecution + this.cleanUpCoordinatorContextMapIfNeeded(cause); + } } private void cleanUpResultHandle() { @@ -377,7 +387,6 @@ private void releaseResource(Throwable t) { resultHandle.close(); } cleanUpResultHandle(); - resultHandle = null; } } From a566e4672244b72591e2a0d9236e2292ca23ca70 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Thu, 26 Mar 2026 16:48:38 +0800 Subject: [PATCH 3/4] fix some ci --- .../thrift/impl/ClientRPCServiceImpl.java | 2 +- .../db/queryengine/plan/Coordinator.java | 2 +- .../schema/ClusterSchemaFetchExecutor.java | 38 +++++++++++-------- .../plan/execution/QueryExecution.java | 10 ++--- .../execution/config/ConfigExecution.java | 2 +- 5 files changed, 30 insertions(+), 24 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index d9913bf42c4c6..8a498fce2e3d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -1712,7 +1712,7 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { resp.setHasResultSet(hasResultSet); resp.setQueryDataSet(result); resp.setIsAlign(true); - resp.setMoreData(finished); + resp.setMoreData(!finished); return resp; } } catch (Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index a636f858d670f..f52d3fe315fce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -299,7 +299,7 @@ public void cleanupQueryExecution( } /** - * cWe need to reclaim resources from queries that have exceeded their timeout by more than one + * We need to reclaim resources from queries that have exceeded their timeout by more than one * minute. This indicates that the associated clients have failed to perform proper resource * cleanup. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index bcea21a666eee..59978500793ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; +import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.internal.DeviceSchemaFetchStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.SeriesSchemaFetchStatement; @@ -250,29 +251,34 @@ private ClusterSchemaTree executeSchemaFetchQuery( String.format("Fetch Schema failed, because %s", executionResult.status.getMessage()), executionResult.status.getCode()); } + IQueryExecution queryExecution = coordinator.getQueryExecution(queryId); try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) { ClusterSchemaTree result = new ClusterSchemaTree(); ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer = new ClusterSchemaTree.SchemaNodeBatchDeserializer(); Set databaseSet = new HashSet<>(); - while (coordinator.getQueryExecution(queryId).hasNextResult()) { - // The query will be transited to FINISHED when invoking getBatchResult() at the last time - // So we don't need to clean up it manually - Optional tsBlock; - try { - tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); - } catch (IoTDBException e) { - t = e; - throw new RuntimeException("Fetch Schema failed. ", e); - } - if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { - break; - } - Column column = tsBlock.get().getColumn(0); - for (int i = 0; i < column.getPositionCount(); i++) { - parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context); + if (queryExecution != null) { + while (queryExecution.hasNextResult()) { + // The query will be transited to FINISHED when invoking getBatchResult() at the last + // time + // So we don't need to clean up it manually + Optional tsBlock; + try { + tsBlock = queryExecution.getBatchResult(); + } catch (IoTDBException e) { + t = e; + throw new RuntimeException("Fetch Schema failed. ", e); + } + if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { + break; + } + Column column = tsBlock.get().getColumn(0); + for (int i = 0; i < column.getPositionCount(); i++) { + parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context); + } } } + result.setDatabases(databaseSet); return result; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 1fd53e708308c..00797c6a42071 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -111,7 +111,7 @@ public class QueryExecution implements IQueryExecution { // -1 if previous rpc is finished and next client req hasn't come yet, unit is ns // it will be updated in fetchResult rpc // protected by synchronized(this) - private long startTimeOfCurrentRpc = System.nanoTime(); + private volatile long startTimeOfCurrentRpc = System.nanoTime(); private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET = QueryExecutionMetricSet.getInstance(); @@ -391,10 +391,10 @@ private void releaseResource(Throwable t) { } /** - * clear up Coordinator.queryExecutionMap by calling Coordinator.cleanupQueryExecution if the - * current rpc is finished. We need to make sure the cleanup logic is only called when client - * connection is not active, otherwise the finally code logic in ClientRPCServiceImpl will handle - * that + * Clear up Coordinator.queryExecutionMap by calling Coordinator.cleanupQueryExecution if there is + * no RPC in progress for this query (that is, the current RPC has finished and {@code + * startTimeOfCurrentRpc == -1}). In cases where an RPC is still active, the finally block in + * ClientRPCServiceImpl is responsible for performing the cleanup. */ private synchronized void cleanUpCoordinatorContextMapIfNeeded(Throwable t) { if (startTimeOfCurrentRpc == -1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index ab835735789ce..b59684ec65a16 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -76,7 +76,7 @@ public class ConfigExecution implements IQueryExecution { // it will be updated in fetchResult rpc // currently, ConfigExecution will return result is just one call, so this field is not used. But // we will keep it for future use when ConfigExecution may return result in multiple calls - private long startTimeOfCurrentRpc = System.nanoTime(); + private volatile long startTimeOfCurrentRpc = System.nanoTime(); public ConfigExecution( MPPQueryContext context, From e125abc419f26511399ea53a9a7909a063f9fd66 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Thu, 26 Mar 2026 17:45:20 +0800 Subject: [PATCH 4/4] fix some ci --- .../iotdb/db/queryengine/plan/Coordinator.java | 10 +++++++--- .../analyze/schema/ClusterSchemaFetchExecutor.java | 3 +++ .../plan/execution/IQueryExecution.java | 10 ++++++++++ .../queryengine/plan/execution/QueryExecution.java | 14 +++++++++++++- .../plan/execution/config/ConfigExecution.java | 5 +++++ .../execution/operator/MergeSortOperatorTest.java | 5 +++++ 6 files changed, 43 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index f52d3fe315fce..8afdfce7d5c65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -307,14 +307,18 @@ public void cleanUpStaleQueries() { long currentTime = System.currentTimeMillis(); queryExecutionMap.forEach( (queryId, queryExecution) -> { + if (queryExecution.isActive()) { + return; + } long timeout = queryExecution.getTimeout(); long queryStartTime = queryExecution.getStartExecutionTime(); long executeTime = currentTime - queryStartTime; - if (timeout > 0 && executeTime > timeout + 60_000L) { + if (timeout > 0 && executeTime - 60_000L > timeout) { LOGGER.warn( - "Cleaning up stale query with id {}, which has been running for {} ms", + "Cleaning up stale query with id {}, which has been running for {} ms, timeout duration is: {}ms", queryId, - executeTime); + executeTime, + timeout); cleanupQueryExecution( queryId, null, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 59978500793ef..7a30a6fa50f19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -277,6 +277,9 @@ private ClusterSchemaTree executeSchemaFetchQuery( parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context); } } + } else { + throw new RuntimeException( + String.format("Fetch Schema failed, because queryExecution is null for %s", queryId)); } result.setDatabases(databaseSet); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index a27316fb00b91..09f25a781e1fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -71,6 +71,16 @@ public interface IQueryExecution { */ void updateCurrentRpcStartTime(long startTime); + /** + * Check if there is an active RPC for this query. If {@code startTimeOfCurrentRpc == -1}, it + * means there is no active RPC, otherwise there is an active RPC. An active RPC means that the + * client is still fetching results and the QueryExecution should not be cleaned up until the RPC + * finishes. On the other hand, if there is no active RPC, it means that the client has finished + * fetching results or has not started fetching results yet, and the QueryExecution can be safely + * cleaned up. + */ + boolean isActive(); + /** * @return cost time in ns */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 00797c6a42071..8de6165525e29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -397,11 +397,23 @@ private void releaseResource(Throwable t) { * ClientRPCServiceImpl is responsible for performing the cleanup. */ private synchronized void cleanUpCoordinatorContextMapIfNeeded(Throwable t) { - if (startTimeOfCurrentRpc == -1) { + if (isActive()) { Coordinator.getInstance().cleanupQueryExecution(context.getLocalQueryId(), null, t); } } + /** + * Check if there is an active RPC for this query. If {@code startTimeOfCurrentRpc == -1}, it + * means there is no active RPC, otherwise there is an active RPC. An active RPC means that the + * client is still fetching results and the QueryExecution should not be cleaned up until the RPC + * finishes. On the other hand, if there is no active RPC, it means that the client has finished + * fetching results or has not started fetching results yet, and the QueryExecution can be safely + * cleaned up. + */ + public synchronized boolean isActive() { + return startTimeOfCurrentRpc == -1; + } + /** * This method will be called by the request thread from client connection. This method will block * until one of these conditions occurs: 1. There is a batch of result 2. There is no more result diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index b59684ec65a16..e3cf2408f068a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -255,6 +255,11 @@ public void updateCurrentRpcStartTime(long startTime) { this.startTimeOfCurrentRpc = startTime; } + @Override + public boolean isActive() { + return startTimeOfCurrentRpc == -1; + } + @Override public long getTotalExecutionTime() { return totalExecutionTime diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java index 2ef0856cca794..76ec370ad98be 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java @@ -1801,6 +1801,11 @@ public void recordExecutionTime(long executionTime) {} @Override public void updateCurrentRpcStartTime(long startTime) {} + @Override + public boolean isActive() { + return true; + } + @Override public long getTotalExecutionTime() { return 0;