diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index b5edafec160c2..8a498fce2e3d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -205,6 +205,7 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; +import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED; public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @@ -241,6 +242,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + private static final String NO_QUERY_EXECUTION_ERR_MSG = + "Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log."; + @FunctionalInterface public interface SelectResult { boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize) @@ -1147,15 +1151,18 @@ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) { finished = true; return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus()); } - TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); queryExecution = COORDINATOR.getQueryExecution(req.queryId); if (queryExecution == null) { - resp.setHasResultSet(false); - resp.setMoreData(false); - return resp; + TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode()); + noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG); + return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus); } + + TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); + + queryExecution.updateCurrentRpcStartTime(startTime); statementType = queryExecution.getStatementType(); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { @@ -1686,16 +1693,16 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus()); } - TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); - queryExecution = COORDINATOR.getQueryExecution(req.queryId); if (queryExecution == null) { - resp.setHasResultSet(false); - resp.setMoreData(true); - return resp; + TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode()); + noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG); + return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus); } + queryExecution.updateCurrentRpcStartTime(startTime); statementType = queryExecution.getStatementType(); + TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair pair = convertTsBlockByFetchSize(queryExecution, req.fetchSize); @@ -1705,7 +1712,7 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { resp.setHasResultSet(hasResultSet); resp.setQueryDataSet(result); resp.setIsAlign(true); - resp.setMoreData(finished); + resp.setMoreData(!finished); return resp; } } catch (Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index fdd50c447f732..7479e832a9016 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; @@ -51,7 +52,11 @@ public class MPPQueryContext { private long localQueryId; private SessionInfo session; private QueryType queryType = QueryType.READ; + + /** the max executing time of query in ms. Unit: millisecond */ private long timeOut; + + // time unit is ms private long startTime; private TEndPoint localDataBlockEndpoint; @@ -95,6 +100,7 @@ public class MPPQueryContext { private boolean userQuery = false; + @TestOnly public MPPQueryContext(QueryId queryId) { this.queryId = queryId; this.endPointBlackList = ConcurrentHashMap.newKeySet(); @@ -102,19 +108,14 @@ public MPPQueryContext(QueryId queryId) { new NotThreadSafeMemoryReservationManager(queryId, this.getClass().getName()); } - // TODO too many callers just pass a null SessionInfo which should be forbidden + @TestOnly public MPPQueryContext( String sql, QueryId queryId, SessionInfo session, TEndPoint localDataBlockEndpoint, TEndPoint localInternalEndpoint) { - this(queryId); - this.sql = sql; - this.session = session; - this.localDataBlockEndpoint = localDataBlockEndpoint; - this.localInternalEndpoint = localInternalEndpoint; - this.initResultNodeContext(); + this(sql, queryId, -1, session, localDataBlockEndpoint, localInternalEndpoint); } public MPPQueryContext( @@ -182,10 +183,12 @@ public QueryType getQueryType() { return queryType; } + /** the max executing time of query in ms. Unit: millisecond */ public long getTimeOut() { return timeOut; } + /** the max executing time of query in ms. Unit: millisecond */ public void setTimeOut(long timeOut) { this.timeOut = timeOut; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java index 146359bc215ed..19d0fb38749d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java @@ -107,10 +107,10 @@ public void transitionToCanceled() { transitionToDoneState(CANCELED); } - public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) { + public boolean transitionToCanceled(Throwable throwable, TSStatus failureStatus) { this.failureStatus.compareAndSet(null, failureStatus); this.failureException.compareAndSet(null, throwable); - transitionToDoneState(CANCELED); + return transitionToDoneState(CANCELED); } public void transitionToAborted() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 9a2657101e724..e8d0fd8243201 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; +import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; @@ -427,6 +428,7 @@ private void cancelTimeoutFlushingInstances() { + "ms, and now is in flushing state")); } }); + Coordinator.getInstance().cleanUpStaleQueries(); } public ExecutorService getIntoOperationExecutor() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index ed76b837e801d..8afdfce7d5c65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -241,7 +242,6 @@ public int getQueryExecutionMapSize() { return queryExecutionMap.size(); } - // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management private ExecutorService getQueryExecutor() { int coordinatorReadExecutorSize = CONFIG.getCoordinatorReadExecutorSize(); return IoTDBThreadPoolFactory.newFixedThreadPool( @@ -254,7 +254,6 @@ private ExecutorService getWriteExecutor() { coordinatorWriteExecutorSize, ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName()); } - // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management private ScheduledExecutorService getScheduledExecutor() { return IoTDBThreadPoolFactory.newScheduledThreadPool( COORDINATOR_SCHEDULED_EXECUTOR_SIZE, @@ -267,12 +266,11 @@ public QueryId createQueryId() { public void cleanupQueryExecution( Long queryId, org.apache.thrift.TBase nativeApiRequest, Throwable t) { - IQueryExecution queryExecution = getQueryExecution(queryId); + IQueryExecution queryExecution = queryExecutionMap.remove(queryId); if (queryExecution != null) { try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) { LOGGER.debug("[CleanUpQuery]]"); queryExecution.stopAndCleanup(t); - queryExecutionMap.remove(queryId); if (queryExecution.isQuery() && queryExecution.isUserQuery()) { long costTime = queryExecution.getTotalExecutionTime(); // print slow query @@ -300,6 +298,35 @@ public void cleanupQueryExecution( } } + /** + * We need to reclaim resources from queries that have exceeded their timeout by more than one + * minute. This indicates that the associated clients have failed to perform proper resource + * cleanup. + */ + public void cleanUpStaleQueries() { + long currentTime = System.currentTimeMillis(); + queryExecutionMap.forEach( + (queryId, queryExecution) -> { + if (queryExecution.isActive()) { + return; + } + long timeout = queryExecution.getTimeout(); + long queryStartTime = queryExecution.getStartExecutionTime(); + long executeTime = currentTime - queryStartTime; + if (timeout > 0 && executeTime - 60_000L > timeout) { + LOGGER.warn( + "Cleaning up stale query with id {}, which has been running for {} ms, timeout duration is: {}ms", + queryId, + executeTime, + timeout); + cleanupQueryExecution( + queryId, + null, + new QueryTimeoutRuntimeException(queryStartTime, currentTime, timeout)); + } + }); + } + public void cleanupQueryExecution(Long queryId) { cleanupQueryExecution(queryId, null, null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index bcea21a666eee..7a30a6fa50f19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; +import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.internal.DeviceSchemaFetchStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.SeriesSchemaFetchStatement; @@ -250,29 +251,37 @@ private ClusterSchemaTree executeSchemaFetchQuery( String.format("Fetch Schema failed, because %s", executionResult.status.getMessage()), executionResult.status.getCode()); } + IQueryExecution queryExecution = coordinator.getQueryExecution(queryId); try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) { ClusterSchemaTree result = new ClusterSchemaTree(); ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer = new ClusterSchemaTree.SchemaNodeBatchDeserializer(); Set databaseSet = new HashSet<>(); - while (coordinator.getQueryExecution(queryId).hasNextResult()) { - // The query will be transited to FINISHED when invoking getBatchResult() at the last time - // So we don't need to clean up it manually - Optional tsBlock; - try { - tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); - } catch (IoTDBException e) { - t = e; - throw new RuntimeException("Fetch Schema failed. ", e); - } - if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { - break; - } - Column column = tsBlock.get().getColumn(0); - for (int i = 0; i < column.getPositionCount(); i++) { - parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context); + if (queryExecution != null) { + while (queryExecution.hasNextResult()) { + // The query will be transited to FINISHED when invoking getBatchResult() at the last + // time + // So we don't need to clean up it manually + Optional tsBlock; + try { + tsBlock = queryExecution.getBatchResult(); + } catch (IoTDBException e) { + t = e; + throw new RuntimeException("Fetch Schema failed. ", e); + } + if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { + break; + } + Column column = tsBlock.get().getColumn(0); + for (int i = 0; i < column.getPositionCount(); i++) { + parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context); + } } + } else { + throw new RuntimeException( + String.format("Fetch Schema failed, because queryExecution is null for %s", queryId)); } + result.setDatabases(databaseSet); return result; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index 61824aa4f0208..09f25a781e1fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -33,8 +33,6 @@ public interface IQueryExecution { void stop(Throwable t); - void stopAndCleanup(); - void stopAndCleanup(Throwable t); void cancel(); @@ -57,10 +55,32 @@ public interface IQueryExecution { String getQueryId(); + // time unit is ms long getStartExecutionTime(); + /** + * @param executionTime time unit should be ns + */ void recordExecutionTime(long executionTime); + /** + * update current rpc start time, which is used to calculate rpc execution time and update total + * execution time + * + * @param startTime start time of current rpc, time unit is ns + */ + void updateCurrentRpcStartTime(long startTime); + + /** + * Check if there is an active RPC for this query. If {@code startTimeOfCurrentRpc == -1}, it + * means there is no active RPC, otherwise there is an active RPC. An active RPC means that the + * client is still fetching results and the QueryExecution should not be cleaned up until the RPC + * finishes. On the other hand, if there is no active RPC, it means that the client has finished + * fetching results or has not started fetching results yet, and the QueryExecution can be safely + * cleaned up. + */ + boolean isActive(); + /** * @return cost time in ns */ @@ -69,6 +89,7 @@ public interface IQueryExecution { /** return ip for a thrift-based client, client-id for MQTT/REST client */ String getClientHostname(); + /** the max executing time of query in ms. Unit: millisecond */ long getTimeout(); Optional getExecuteSQL(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 43f461330533e..8de6165525e29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; +import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle; @@ -104,9 +105,14 @@ public class QueryExecution implements IQueryExecution { private final AtomicBoolean stopped; - // cost time in ns + // cost time in ns of finished rpc private long totalExecutionTime = 0; + // -1 if previous rpc is finished and next client req hasn't come yet, unit is ns + // it will be updated in fetchResult rpc + // protected by synchronized(this) + private volatile long startTimeOfCurrentRpc = System.nanoTime(); + private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET = QueryExecutionMetricSet.getInstance(); private static final QueryPlanCostMetricSet QUERY_PLAN_COST_METRIC_SET = @@ -129,16 +135,17 @@ public QueryExecution(IPlanner planner, MPPQueryContext context, ExecutorService if (!state.isDone()) { return; } - // TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be - // invoked + Throwable cause = null; if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) { LOGGER.debug("[ReleaseQueryResource] state is: {}", state); - Throwable cause = stateMachine.getFailureException(); + cause = stateMachine.getFailureException(); releaseResource(cause); + + this.cleanUpCoordinatorContextMapIfNeeded(cause); } - this.stop(null); + this.stop(cause); } }); this.stopped = new AtomicBoolean(false); @@ -324,32 +331,20 @@ public void stop(Throwable t) { } } - // Stop the query and clean up all the resources this query occupied - public void stopAndCleanup() { - stop(null); - releaseResource(); - } - @Override public void cancel() { - stateMachine.transitionToCanceled( - new KilledByOthersException(), - new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode()) - .setMessage(KilledByOthersException.MESSAGE)); - } - - /** Release the resources that current QueryExecution hold. */ - private void releaseResource() { - // close ResultHandle to unblock client's getResult request - // Actually, we should not close the ResultHandle when the QueryExecution is Finished. - // There are only two scenarios where the ResultHandle should be closed: - // 1. The client fetch all the result and the ResultHandle is finished. - // 2. The client's connection is closed that all owned QueryExecution should be cleaned up - // If the QueryExecution's state is abnormal, we should also abort the resultHandle without - // waiting it to be finished. - if (resultHandle != null) { - resultHandle.close(); - cleanUpResultHandle(); + Throwable cause = new KilledByOthersException(); + boolean cancelled = + stateMachine.transitionToCanceled( + cause, + new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode()) + .setMessage(KilledByOthersException.MESSAGE)); + if (!cancelled) { + // cancel failed, means this query has already in a done state, we can do nothing to change + // the state but clean up the resource if needed + // we don't need to do cleanUpCoordinatorContextMapIfNeeded if cancel succeed, because it will + // be called in callback logic in QueryStateMachine of this QueryExecution + this.cleanUpCoordinatorContextMapIfNeeded(cause); } } @@ -358,7 +353,7 @@ private void cleanUpResultHandle() { // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream // FragmentInstanceId to register - if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) { + if (resultHandle instanceof SourceHandle) { TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId(); MPPDataExchangeService.getInstance() .getMPPDataExchangeManager() @@ -385,7 +380,7 @@ private void releaseResource(Throwable t) { // 2. The client's connection is closed that all owned QueryExecution should be cleaned up // If the QueryExecution's state is abnormal, we should also abort the resultHandle without // waiting it to be finished. - if (resultHandle != null) { + if (resultHandle != null && resultHandleCleanUp.compareAndSet(false, true)) { if (t != null) { resultHandle.abort(t); } else { @@ -395,6 +390,30 @@ private void releaseResource(Throwable t) { } } + /** + * Clear up Coordinator.queryExecutionMap by calling Coordinator.cleanupQueryExecution if there is + * no RPC in progress for this query (that is, the current RPC has finished and {@code + * startTimeOfCurrentRpc == -1}). In cases where an RPC is still active, the finally block in + * ClientRPCServiceImpl is responsible for performing the cleanup. + */ + private synchronized void cleanUpCoordinatorContextMapIfNeeded(Throwable t) { + if (isActive()) { + Coordinator.getInstance().cleanupQueryExecution(context.getLocalQueryId(), null, t); + } + } + + /** + * Check if there is an active RPC for this query. If {@code startTimeOfCurrentRpc == -1}, it + * means there is no active RPC, otherwise there is an active RPC. An active RPC means that the + * client is still fetching results and the QueryExecution should not be cleaned up until the RPC + * finishes. On the other hand, if there is no active RPC, it means that the client has finished + * fetching results or has not started fetching results yet, and the QueryExecution can be safely + * cleaned up. + */ + public synchronized boolean isActive() { + return startTimeOfCurrentRpc == -1; + } + /** * This method will be called by the request thread from client connection. This method will block * until one of these conditions occurs: 1. There is a batch of result 2. There is no more result @@ -648,13 +667,22 @@ public long getStartExecutionTime() { } @Override - public void recordExecutionTime(long executionTime) { + public synchronized void recordExecutionTime(long executionTime) { totalExecutionTime += executionTime; + // recordExecutionTime is called after current rpc finished, so we need to set + // startTimeOfCurrentRpc to -1 + this.startTimeOfCurrentRpc = -1; + } + + @Override + public synchronized void updateCurrentRpcStartTime(long startTime) { + this.startTimeOfCurrentRpc = startTime; } @Override - public long getTotalExecutionTime() { - return totalExecutionTime; + public synchronized long getTotalExecutionTime() { + return totalExecutionTime + + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - startTimeOfCurrentRpc); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index a3e16a991ec63..e3cf2408f068a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -72,6 +72,12 @@ public class ConfigExecution implements IQueryExecution { private final StatementType statementType; private long totalExecutionTime; + // -1 if previous rpc is finished and next client req hasn't come yet, unit is ns + // it will be updated in fetchResult rpc + // currently, ConfigExecution will return result is just one call, so this field is not used. But + // we will keep it for future use when ConfigExecution may return result in multiple calls + private volatile long startTimeOfCurrentRpc = System.nanoTime(); + public ConfigExecution( MPPQueryContext context, StatementType statementType, @@ -144,11 +150,6 @@ public void stop(Throwable t) { // do nothing } - @Override - public void stopAndCleanup() { - // do nothing - } - @Override public void stopAndCleanup(Throwable t) { // do nothing @@ -244,11 +245,25 @@ public long getStartExecutionTime() { @Override public void recordExecutionTime(long executionTime) { totalExecutionTime += executionTime; + // recordExecutionTime is called after current rpc finished, so we need to set + // startTimeOfCurrentRpc to -1 + this.startTimeOfCurrentRpc = -1; + } + + @Override + public void updateCurrentRpcStartTime(long startTime) { + this.startTimeOfCurrentRpc = startTime; + } + + @Override + public boolean isActive() { + return startTimeOfCurrentRpc == -1; } @Override public long getTotalExecutionTime() { - return totalExecutionTime; + return totalExecutionTime + + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - startTimeOfCurrentRpc); } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java index 7f3b5cd8d2d78..76ec370ad98be 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java @@ -1798,6 +1798,14 @@ public long getStartExecutionTime() { @Override public void recordExecutionTime(long executionTime) {} + @Override + public void updateCurrentRpcStartTime(long startTime) {} + + @Override + public boolean isActive() { + return true; + } + @Override public long getTotalExecutionTime() { return 0; @@ -1819,9 +1827,6 @@ public void start() {} @Override public void stop(Throwable t) {} - @Override - public void stopAndCleanup() {} - @Override public void stopAndCleanup(Throwable t) {}