diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index d3f6b7d91dd6..ed754c418c63 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -245,6 +245,7 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; import static org.apache.iotdb.rpc.RpcUtils.TIME_PRECISION; +import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED; public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @@ -286,6 +287,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + private static final String NO_QUERY_EXECUTION_ERR_MSG = + "Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log."; + @FunctionalInterface public interface SelectResult { @@ -1526,15 +1530,18 @@ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) { finished = true; return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus()); } - TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); queryExecution = COORDINATOR.getQueryExecution(req.queryId); if (queryExecution == null) { - resp.setHasResultSet(false); - resp.setMoreData(false); - return resp; + TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode()); + noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG); + return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus); } + + TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); + + queryExecution.updateCurrentRpcStartTime(startTime); statementType = queryExecution.getStatementType(); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { @@ -2272,16 +2279,16 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus()); } - TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); - queryExecution = COORDINATOR.getQueryExecution(req.queryId); if (queryExecution == null) { - resp.setHasResultSet(false); - resp.setMoreData(true); - return resp; + TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode()); + noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG); + return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus); } + queryExecution.updateCurrentRpcStartTime(startTime); statementType = queryExecution.getStatementType(); + TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair pair = convertTsBlockByFetchSize(queryExecution, req.fetchSize); @@ -2291,7 +2298,7 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { resp.setHasResultSet(hasResultSet); resp.setQueryDataSet(result); resp.setIsAlign(true); - resp.setMoreData(finished); + resp.setMoreData(!finished); return resp; } } catch (Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 0294a14af25c..88bd1998f683 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -76,7 +76,11 @@ public enum ExplainType { private long localQueryId; private SessionInfo session; private QueryType queryType = QueryType.READ; + + /** the max executing time of query in ms. Unit: millisecond */ private long timeOut; + + // time unit is ms private long startTime; private TEndPoint localDataBlockEndpoint; @@ -147,6 +151,7 @@ public enum ExplainType { // Tables in the subquery private final Map, List> subQueryTables = new HashMap<>(); + @TestOnly public MPPQueryContext(QueryId queryId) { this.queryId = queryId; this.endPointBlackList = ConcurrentHashMap.newKeySet(); @@ -161,12 +166,7 @@ public MPPQueryContext( SessionInfo session, TEndPoint localDataBlockEndpoint, TEndPoint localInternalEndpoint) { - this(queryId); - this.sql = sql; - this.session = session; - this.localDataBlockEndpoint = localDataBlockEndpoint; - this.localInternalEndpoint = localInternalEndpoint; - this.initResultNodeContext(); + this(sql, queryId, -1, session, localDataBlockEndpoint, localInternalEndpoint); } public MPPQueryContext( @@ -244,10 +244,12 @@ public QueryType getQueryType() { return queryType; } + /** the max executing time of query in ms. Unit: millisecond */ public long getTimeOut() { return timeOut; } + /** the max executing time of query in ms. Unit: millisecond */ public void setTimeOut(long timeOut) { this.timeOut = timeOut; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java index 146359bc215e..19d0fb38749d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java @@ -107,10 +107,10 @@ public void transitionToCanceled() { transitionToDoneState(CANCELED); } - public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) { + public boolean transitionToCanceled(Throwable throwable, TSStatus failureStatus) { this.failureStatus.compareAndSet(null, failureStatus); this.failureException.compareAndSet(null, throwable); - transitionToDoneState(CANCELED); + return transitionToDoneState(CANCELED); } public void transitionToAborted() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 3f6812943d6d..1898cbfe53cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; +import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; @@ -435,6 +436,7 @@ private void cancelTimeoutFlushingInstances() { + "ms, and now is in flushing state")); } }); + Coordinator.getInstance().cleanUpStaleQueries(); } public ExecutorService getIntoOperationExecutor() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index b0d28f03c02a..9342b3a4674b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.PreparedStatementInfo; @@ -805,7 +806,7 @@ public QueryId createQueryId() { } public void cleanupQueryExecution(Long queryId, Supplier contentSupplier, Throwable t) { - IQueryExecution queryExecution = getQueryExecution(queryId); + IQueryExecution queryExecution = queryExecutionMap.remove(queryId); if (queryExecution != null) { cleanupQueryExecutionInternal(queryId, queryExecution, contentSupplier, t); } @@ -813,7 +814,7 @@ public void cleanupQueryExecution(Long queryId, Supplier contentSupplier public void cleanupQueryExecution( Long queryId, org.apache.thrift.TBase nativeApiRequest, Throwable t) { - IQueryExecution queryExecution = getQueryExecution(queryId); + IQueryExecution queryExecution = queryExecutionMap.remove(queryId); if (queryExecution != null) { Supplier contentSupplier = new ContentOfQuerySupplier(nativeApiRequest, queryExecution); @@ -898,6 +899,35 @@ public static void recordQueries( } } + /** + * We need to reclaim resources from queries that have exceeded their timeout by more than one + * minute. This indicates that the associated clients have failed to perform proper resource + * cleanup. + */ + public void cleanUpStaleQueries() { + long currentTime = System.currentTimeMillis(); + queryExecutionMap.forEach( + (queryId, queryExecution) -> { + if (queryExecution.isActive()) { + return; + } + long timeout = queryExecution.getTimeout(); + long queryStartTime = queryExecution.getStartExecutionTime(); + long executeTime = currentTime - queryStartTime; + if (timeout > 0 && executeTime - 60_000L > timeout) { + LOGGER.warn( + "Cleaning up stale query with id {}, which has been running for {} ms, timeout duration is: {}ms", + queryId, + executeTime, + timeout); + cleanupQueryExecution( + queryId, + (org.apache.thrift.TBase) null, + new QueryTimeoutRuntimeException(queryStartTime, currentTime, timeout)); + } + }); + } + public void cleanupQueryExecution(Long queryId) { cleanupQueryExecution(queryId, (org.apache.thrift.TBase) null, null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index aad9f50aca0d..d575f6420eb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; +import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.internal.DeviceSchemaFetchStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.SeriesSchemaFetchStatement; @@ -265,30 +266,38 @@ private ClusterSchemaTree executeSchemaFetchQuery( String.format("Fetch Schema failed, because %s", executionResult.status.getMessage()), executionResult.status.getCode()); } + IQueryExecution queryExecution = coordinator.getQueryExecution(queryId); try (SetThreadName ignored = new SetThreadName(executionResult.queryId.getId())) { ClusterSchemaTree result = new ClusterSchemaTree(); ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer = new ClusterSchemaTree.SchemaNodeBatchDeserializer(); Set databaseSet = new HashSet<>(); - while (coordinator.getQueryExecution(queryId).hasNextResult()) { - // The query will be transited to FINISHED when invoking getBatchResult() at the last time - // So we don't need to clean up it manually - Optional tsBlock; - try { - tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); - } catch (IoTDBException e) { - t = e; - throw new QuerySchemaFetchFailedException( - String.format("Fetch Schema failed: %s", e.getMessage()), e.getErrorCode()); - } - if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { - break; - } - Column column = tsBlock.get().getColumn(0); - for (int i = 0; i < column.getPositionCount(); i++) { - parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context); + if (queryExecution != null) { + while (queryExecution.hasNextResult()) { + // The query will be transited to FINISHED when invoking getBatchResult() at the last + // time + // So we don't need to clean up it manually + Optional tsBlock; + try { + tsBlock = queryExecution.getBatchResult(); + } catch (IoTDBException e) { + t = e; + throw new QuerySchemaFetchFailedException( + String.format("Fetch Schema failed: %s", e.getMessage()), e.getErrorCode()); + } + if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { + break; + } + Column column = tsBlock.get().getColumn(0); + for (int i = 0; i < column.getPositionCount(); i++) { + parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context); + } } + } else { + throw new RuntimeException( + String.format("Fetch Schema failed, because queryExecution is null for %s", queryId)); } + result.setDatabases(databaseSet); return result; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index f754bc4b10e7..9b5a183d98a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -35,8 +35,6 @@ public interface IQueryExecution { void stop(Throwable t); - void stopAndCleanup(); - void stopAndCleanup(Throwable t); void cancel(); @@ -61,15 +59,38 @@ public interface IQueryExecution { String getQueryId(); + // time unit is ms long getStartExecutionTime(); + /** + * @param executionTime time unit should be ns + */ void recordExecutionTime(long executionTime); + /** + * update current rpc start time, which is used to calculate rpc execution time and update total + * execution time + * + * @param startTime start time of current rpc, time unit is ns + */ + void updateCurrentRpcStartTime(long startTime); + + /** + * Check if there is an active RPC for this query. If {@code startTimeOfCurrentRpc == -1}, it + * means there is no active RPC, otherwise there is an active RPC. An active RPC means that the + * client is still fetching results and the QueryExecution should not be cleaned up until the RPC + * finishes. On the other hand, if there is no active RPC, it means that the client has finished + * fetching results or has not started fetching results yet, and the QueryExecution can be safely + * cleaned up. + */ + boolean isActive(); + /** * @return cost time in ns */ long getTotalExecutionTime(); + /** the max executing time of query in ms. Unit: millisecond */ long getTimeout(); Optional getExecuteSQL(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 6e940e9816e2..7500fbb9c34d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; +import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle; @@ -108,9 +109,14 @@ public class QueryExecution implements IQueryExecution { private final AtomicBoolean stopped; - // cost time in ns + // cost time in ns of finished rpc private long totalExecutionTime = 0; + // -1 if previous rpc is finished and next client req hasn't come yet, unit is ns + // it will be updated in fetchResult rpc + // protected by synchronized(this) + private volatile long startTimeOfCurrentRpc = System.nanoTime(); + private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET = QueryExecutionMetricSet.getInstance(); private static final QueryPlanCostMetricSet QUERY_PLAN_COST_METRIC_SET = @@ -133,14 +139,19 @@ public QueryExecution(IPlanner planner, MPPQueryContext context, ExecutorService if (!state.isDone()) { return; } + Throwable cause = null; if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) { - LOGGER.debug("[ReleaseQueryResource] state is: {}", state); - Throwable cause = stateMachine.getFailureException(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[ReleaseQueryResource] state is: {}", state); + } + cause = stateMachine.getFailureException(); releaseResource(cause); + + this.cleanUpCoordinatorContextMapIfNeeded(cause); } - this.stop(null); + this.stop(cause); } }); this.stopped = new AtomicBoolean(false); @@ -321,33 +332,20 @@ public void stop(Throwable t) { } } - // Stop the query and clean up all the resources this query occupied - @Override - public void stopAndCleanup() { - stop(null); - releaseResource(); - } - @Override public void cancel() { - stateMachine.transitionToCanceled( - new KilledByOthersException(), - new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode()) - .setMessage(KilledByOthersException.MESSAGE)); - } - - /** Release the resources that current QueryExecution hold. */ - private void releaseResource() { - // close ResultHandle to unblock client's getResult request - // Actually, we should not close the ResultHandle when the QueryExecution is Finished. - // There are only two scenarios where the ResultHandle should be closed: - // 1. The client fetch all the result and the ResultHandle is finished. - // 2. The client's connection is closed that all owned QueryExecution should be cleaned up - // If the QueryExecution's state is abnormal, we should also abort the resultHandle without - // waiting it to be finished. - if (resultHandle != null) { - resultHandle.close(); - cleanUpResultHandle(); + Throwable cause = new KilledByOthersException(); + boolean cancelled = + stateMachine.transitionToCanceled( + cause, + new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode()) + .setMessage(KilledByOthersException.MESSAGE)); + if (!cancelled) { + // cancel failed, means this query has already in a done state, we can do nothing to change + // the state but clean up the resource if needed + // we don't need to do cleanUpCoordinatorContextMapIfNeeded if cancel succeed, because it will + // be called in callback logic in QueryStateMachine of this QueryExecution + this.cleanUpCoordinatorContextMapIfNeeded(cause); } } @@ -356,7 +354,7 @@ private void cleanUpResultHandle() { // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream // FragmentInstanceId to register - if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) { + if (resultHandle instanceof SourceHandle) { TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId(); MPPDataExchangeService.getInstance() .getMPPDataExchangeManager() @@ -384,7 +382,7 @@ private void releaseResource(Throwable t) { // 2. The client's connection is closed that all owned QueryExecution should be cleaned up // If the QueryExecution's state is abnormal, we should also abort the resultHandle without // waiting it to be finished. - if (resultHandle != null) { + if (resultHandle != null && resultHandleCleanUp.compareAndSet(false, true)) { if (t != null) { resultHandle.abort(t); } else { @@ -394,6 +392,32 @@ private void releaseResource(Throwable t) { } } + /** + * Clear up Coordinator.queryExecutionMap by calling Coordinator.cleanupQueryExecution if there is + * no RPC in progress for this query (that is, the current RPC has finished and {@code + * startTimeOfCurrentRpc == -1}). In cases where an RPC is still active, the finally block in + * ClientRPCServiceImpl is responsible for performing the cleanup. + */ + private synchronized void cleanUpCoordinatorContextMapIfNeeded(Throwable t) { + if (isActive()) { + Coordinator.getInstance() + .cleanupQueryExecution( + context.getLocalQueryId(), (org.apache.thrift.TBase) null, t); + } + } + + /** + * Check if there is an active RPC for this query. If {@code startTimeOfCurrentRpc == -1}, it + * means there is no active RPC, otherwise there is an active RPC. An active RPC means that the + * client is still fetching results and the QueryExecution should not be cleaned up until the RPC + * finishes. On the other hand, if there is no active RPC, it means that the client has finished + * fetching results or has not started fetching results yet, and the QueryExecution can be safely + * cleaned up. + */ + public synchronized boolean isActive() { + return startTimeOfCurrentRpc == -1; + } + /** * This method will be called by the request thread from client connection. This method will block * until one of these conditions occurs: 1. There is a batch of result 2. There is no more result @@ -671,13 +695,22 @@ public long getStartExecutionTime() { } @Override - public void recordExecutionTime(long executionTime) { + public synchronized void recordExecutionTime(long executionTime) { totalExecutionTime += executionTime; + // recordExecutionTime is called after current rpc finished, so we need to set + // startTimeOfCurrentRpc to -1 + this.startTimeOfCurrentRpc = -1; + } + + @Override + public synchronized void updateCurrentRpcStartTime(long startTime) { + this.startTimeOfCurrentRpc = startTime; } @Override - public long getTotalExecutionTime() { - return totalExecutionTime; + public synchronized long getTotalExecutionTime() { + return totalExecutionTime + + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - startTimeOfCurrentRpc); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index f431bb2e51fb..2b70a667be90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -113,6 +113,12 @@ public class ConfigExecution implements IQueryExecution { private final StatementType statementType; private long totalExecutionTime; + // -1 if previous rpc is finished and next client req hasn't come yet, unit is ns + // it will be updated in fetchResult rpc + // currently, ConfigExecution will return result is just one call, so this field is not used. But + // we will keep it for future use when ConfigExecution may return result in multiple calls + private volatile long startTimeOfCurrentRpc = System.nanoTime(); + public ConfigExecution( MPPQueryContext context, StatementType statementType, @@ -222,11 +228,6 @@ public void stop(Throwable t) { // do nothing } - @Override - public void stopAndCleanup() { - // do nothing - } - @Override public void stopAndCleanup(Throwable t) { // do nothing @@ -327,11 +328,25 @@ public long getStartExecutionTime() { @Override public void recordExecutionTime(long executionTime) { totalExecutionTime += executionTime; + // recordExecutionTime is called after current rpc finished, so we need to set + // startTimeOfCurrentRpc to -1 + this.startTimeOfCurrentRpc = -1; + } + + @Override + public void updateCurrentRpcStartTime(long startTime) { + this.startTimeOfCurrentRpc = startTime; + } + + @Override + public boolean isActive() { + return startTimeOfCurrentRpc == -1; } @Override public long getTotalExecutionTime() { - return totalExecutionTime; + return totalExecutionTime + + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - startTimeOfCurrentRpc); } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java index 1739944be7c4..2d0c1bf543c3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java @@ -1826,6 +1826,14 @@ public long getStartExecutionTime() { @Override public void recordExecutionTime(long executionTime) {} + @Override + public void updateCurrentRpcStartTime(long startTime) {} + + @Override + public boolean isActive() { + return true; + } + @Override public long getTotalExecutionTime() { return 0; @@ -1857,9 +1865,6 @@ public void start() {} @Override public void stop(Throwable t) {} - @Override - public void stopAndCleanup() {} - @Override public void stopAndCleanup(Throwable t) {}