Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED;

public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {

Expand Down Expand Up @@ -241,6 +242,9 @@
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();

private static final String NO_QUERY_EXECUTION_ERR_MSG =
"Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log.";

Check warning on line 246 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 134).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0kisyxjpajbXtuAn55&open=AZ0kisyxjpajbXtuAn55&pullRequest=17358

@FunctionalInterface
public interface SelectResult {
boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize)
Expand Down Expand Up @@ -1147,15 +1151,18 @@
finished = true;
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
}
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);

queryExecution = COORDINATOR.getQueryExecution(req.queryId);

if (queryExecution == null) {
resp.setHasResultSet(false);
resp.setMoreData(false);
return resp;
TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode());
noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
}

TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);

queryExecution.updateCurrentRpcStartTime(startTime);
statementType = queryExecution.getStatementType();

try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
Expand Down Expand Up @@ -1686,16 +1693,16 @@
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
}

TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);

queryExecution = COORDINATOR.getQueryExecution(req.queryId);
if (queryExecution == null) {
resp.setHasResultSet(false);
resp.setMoreData(true);
return resp;
TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode());
noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
}
queryExecution.updateCurrentRpcStartTime(startTime);
statementType = queryExecution.getStatementType();

TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
Pair<TSQueryDataSet, Boolean> pair =
convertTsBlockByFetchSize(queryExecution, req.fetchSize);
Expand All @@ -1705,7 +1712,7 @@
resp.setHasResultSet(hasResultSet);
resp.setQueryDataSet(result);
resp.setIsAlign(true);
resp.setMoreData(finished);
resp.setMoreData(!finished);
return resp;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
Expand Down Expand Up @@ -51,7 +52,11 @@ public class MPPQueryContext {
private long localQueryId;
private SessionInfo session;
private QueryType queryType = QueryType.READ;

/** the max executing time of query in ms. Unit: millisecond */
private long timeOut;

// time unit is ms
private long startTime;

private TEndPoint localDataBlockEndpoint;
Expand Down Expand Up @@ -95,26 +100,22 @@ public class MPPQueryContext {

private boolean userQuery = false;

@TestOnly
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
this.endPointBlackList = ConcurrentHashMap.newKeySet();
this.memoryReservationManager =
new NotThreadSafeMemoryReservationManager(queryId, this.getClass().getName());
}

// TODO too many callers just pass a null SessionInfo which should be forbidden
@TestOnly
public MPPQueryContext(
String sql,
QueryId queryId,
SessionInfo session,
TEndPoint localDataBlockEndpoint,
TEndPoint localInternalEndpoint) {
this(queryId);
this.sql = sql;
this.session = session;
this.localDataBlockEndpoint = localDataBlockEndpoint;
this.localInternalEndpoint = localInternalEndpoint;
this.initResultNodeContext();
this(sql, queryId, -1, session, localDataBlockEndpoint, localInternalEndpoint);
}

public MPPQueryContext(
Expand Down Expand Up @@ -182,10 +183,12 @@ public QueryType getQueryType() {
return queryType;
}

/** the max executing time of query in ms. Unit: millisecond */
public long getTimeOut() {
return timeOut;
}

/** the max executing time of query in ms. Unit: millisecond */
public void setTimeOut(long timeOut) {
this.timeOut = timeOut;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ public void transitionToCanceled() {
transitionToDoneState(CANCELED);
}

public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
public boolean transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
this.failureStatus.compareAndSet(null, failureStatus);
this.failureException.compareAndSet(null, throwable);
transitionToDoneState(CANCELED);
return transitionToDoneState(CANCELED);
}

public void transitionToAborted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
Expand Down Expand Up @@ -427,6 +428,7 @@ private void cancelTimeoutFlushingInstances() {
+ "ms, and now is in flushing state"));
}
});
Coordinator.getInstance().cleanUpStaleQueries();
}

public ExecutorService getIntoOperationExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
Expand Down Expand Up @@ -241,7 +242,6 @@
return queryExecutionMap.size();
}

// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ExecutorService getQueryExecutor() {
int coordinatorReadExecutorSize = CONFIG.getCoordinatorReadExecutorSize();
return IoTDBThreadPoolFactory.newFixedThreadPool(
Expand All @@ -254,7 +254,6 @@
coordinatorWriteExecutorSize, ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName());
}

// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ScheduledExecutorService getScheduledExecutor() {
return IoTDBThreadPoolFactory.newScheduledThreadPool(
COORDINATOR_SCHEDULED_EXECUTOR_SIZE,
Expand All @@ -267,12 +266,11 @@

public void cleanupQueryExecution(
Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable t) {
IQueryExecution queryExecution = getQueryExecution(queryId);
IQueryExecution queryExecution = queryExecutionMap.remove(queryId);
if (queryExecution != null) {
try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) {
LOGGER.debug("[CleanUpQuery]]");
queryExecution.stopAndCleanup(t);
queryExecutionMap.remove(queryId);
if (queryExecution.isQuery() && queryExecution.isUserQuery()) {
long costTime = queryExecution.getTotalExecutionTime();
// print slow query
Expand Down Expand Up @@ -300,6 +298,35 @@
}
}

/**
* We need to reclaim resources from queries that have exceeded their timeout by more than one
* minute. This indicates that the associated clients have failed to perform proper resource
* cleanup.
*/
public void cleanUpStaleQueries() {
long currentTime = System.currentTimeMillis();
queryExecutionMap.forEach(
(queryId, queryExecution) -> {
if (queryExecution.isActive()) {
return;
}
long timeout = queryExecution.getTimeout();
long queryStartTime = queryExecution.getStartExecutionTime();
long executeTime = currentTime - queryStartTime;
if (timeout > 0 && executeTime - 60_000L > timeout) {
LOGGER.warn(
"Cleaning up stale query with id {}, which has been running for {} ms, timeout duration is: {}ms",

Check warning on line 318 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 114).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0pkhMtAUjaTwythKFa&open=AZ0pkhMtAUjaTwythKFa&pullRequest=17358
queryId,
executeTime,
timeout);
cleanupQueryExecution(
queryId,
null,
new QueryTimeoutRuntimeException(queryStartTime, currentTime, timeout));
}
});
}

public void cleanupQueryExecution(Long queryId) {
cleanupQueryExecution(queryId, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.internal.DeviceSchemaFetchStatement;
import org.apache.iotdb.db.queryengine.plan.statement.internal.SeriesSchemaFetchStatement;
Expand Down Expand Up @@ -239,7 +240,7 @@
return templateMap;
}

private ClusterSchemaTree executeSchemaFetchQuery(

Check failure on line 243 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0pkgfUAUjaTwythKFZ&open=AZ0pkgfUAUjaTwythKFZ&pullRequest=17358
Statement fetchStatement, MPPQueryContext context) {
long queryId = SessionManager.getInstance().requestQueryId();
Throwable t = null;
Expand All @@ -250,29 +251,37 @@
String.format("Fetch Schema failed, because %s", executionResult.status.getMessage()),
executionResult.status.getCode());
}
IQueryExecution queryExecution = coordinator.getQueryExecution(queryId);
try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
ClusterSchemaTree result = new ClusterSchemaTree();
ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer =
new ClusterSchemaTree.SchemaNodeBatchDeserializer();
Set<String> databaseSet = new HashSet<>();
while (coordinator.getQueryExecution(queryId).hasNextResult()) {
// The query will be transited to FINISHED when invoking getBatchResult() at the last time
// So we don't need to clean up it manually
Optional<TsBlock> tsBlock;
try {
tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
} catch (IoTDBException e) {
t = e;
throw new RuntimeException("Fetch Schema failed. ", e);
}
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
break;
}
Column column = tsBlock.get().getColumn(0);
for (int i = 0; i < column.getPositionCount(); i++) {
parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context);
if (queryExecution != null) {
while (queryExecution.hasNextResult()) {
// The query will be transited to FINISHED when invoking getBatchResult() at the last
// time
// So we don't need to clean up it manually
Optional<TsBlock> tsBlock;
try {
tsBlock = queryExecution.getBatchResult();
} catch (IoTDBException e) {
t = e;
throw new RuntimeException("Fetch Schema failed. ", e);
}
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
break;
}
Column column = tsBlock.get().getColumn(0);
for (int i = 0; i < column.getPositionCount(); i++) {
parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context);
}
}
} else {
throw new RuntimeException(

Check warning on line 281 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0pkgfUAUjaTwythKFY&open=AZ0pkgfUAUjaTwythKFY&pullRequest=17358
String.format("Fetch Schema failed, because queryExecution is null for %s", queryId));
}

result.setDatabases(databaseSet);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@

void stop(Throwable t);

void stopAndCleanup();

void stopAndCleanup(Throwable t);

void cancel();
Expand All @@ -57,10 +55,32 @@

String getQueryId();

// time unit is ms
long getStartExecutionTime();

/**

Check warning on line 61 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Summary javadoc is missing.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0oHLugB0YJ19oEqHr5&open=AZ0oHLugB0YJ19oEqHr5&pullRequest=17358
* @param executionTime time unit should be ns
*/
void recordExecutionTime(long executionTime);

/**

Check warning on line 66 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

First sentence of Javadoc is missing an ending period.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0kisVPjpajbXtuAn54&open=AZ0kisVPjpajbXtuAn54&pullRequest=17358
* update current rpc start time, which is used to calculate rpc execution time and update total
* execution time
*
* @param startTime start time of current rpc, time unit is ns
*/
void updateCurrentRpcStartTime(long startTime);

/**
* Check if there is an active RPC for this query. If {@code startTimeOfCurrentRpc == -1}, it
* means there is no active RPC, otherwise there is an active RPC. An active RPC means that the
* client is still fetching results and the QueryExecution should not be cleaned up until the RPC
* finishes. On the other hand, if there is no active RPC, it means that the client has finished
* fetching results or has not started fetching results yet, and the QueryExecution can be safely
* cleaned up.
*/
boolean isActive();

/**
* @return cost time in ns
*/
Expand All @@ -69,6 +89,7 @@
/** return ip for a thrift-based client, client-id for MQTT/REST client */
String getClientHostname();

/** the max executing time of query in ms. Unit: millisecond */
long getTimeout();

Optional<String> getExecuteSQL();
Expand Down
Loading
Loading