diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 2ff390a8e832..5a271c1ead26 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -527,7 +527,8 @@ public void start() { "-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize() + "m", "-Djdk.nio.maxCachedBufferSize=262144", "-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" + killPoints.toString(), - "-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8", + "-Dsun.jnu.encoding=UTF-8", + "-Dfile.encoding=UTF-8", "-cp", server_node_lib_path)); addStartCmdParams(startCmd); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java index 7e3f7dff2cf1..169975450c0b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java @@ -188,6 +188,15 @@ public void testFlushNotExistGroupNoData() { sqe.printStackTrace(); assertTrue(sqe.getMessage().contains(expectedMsg)); } + try { + statement.execute( + "FLUSH root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2 on local"); + } catch (SQLException sqe) { + String expectedMsg = + "500: Database root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2 does not exist on local"; + sqe.printStackTrace(); + assertTrue(sqe.getMessage().contains(expectedMsg)); + } } catch (Exception e) { fail(e.getMessage()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 5d6aa8da9f5d..4d01f3770c21 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -981,11 +981,11 @@ public TSStatus merge() throws TException { @Override public TSStatus flush(final TFlushReq req) throws TException { if (req.storageGroups != null) { - final List noExistSg = + final List noExistDB = configManager.getPartitionManager().filterUnExistDatabases(req.storageGroups); - if (!noExistSg.isEmpty()) { + if (!noExistDB.isEmpty()) { final StringBuilder sb = new StringBuilder(); - noExistSg.forEach(storageGroup -> sb.append(storageGroup).append(",")); + noExistDB.forEach(database -> sb.append(database).append(",")); return RpcUtils.getStatus( TSStatusCode.DATABASE_NOT_EXIST, "Database " + sb.subSequence(0, sb.length() - 1) + " does not exist"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 3aa7aad7e4fe..c2c3a8d16cd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -2433,7 +2433,7 @@ public TSStatus stopRepairData() throws TException { @Override public TSStatus flush(TFlushReq req) throws TException { try { - storageEngine.operateFlush(req); + storageEngine.operateFlush(req, false); } catch (Exception e) { return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index d0627351b273..fc28d7a5f327 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -1264,8 +1264,7 @@ public SettableFuture flush( } } else { try { - StorageEngine.getInstance().operateFlush(tFlushReq); - tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + tsStatus = StorageEngine.getInstance().operateFlush(tFlushReq, true); } catch (final Exception e) { tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 59fefd8d169f..70dc79b6adb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -1574,17 +1574,17 @@ public Node visitRemoveAINodeStatement(RelationalSqlParser.RemoveAINodeStatement @Override public Node visitFlushStatement(final RelationalSqlParser.FlushStatementContext ctx) { final FlushStatement flushStatement = new FlushStatement(StatementType.FLUSH); - List storageGroups = null; + List databases = null; if (ctx.booleanValue() != null) { flushStatement.setSeq(Boolean.parseBoolean(ctx.booleanValue().getText())); } flushStatement.setOnCluster( ctx.localOrClusterMode() == null || ctx.localOrClusterMode().LOCAL() == null); if (ctx.identifier() != null) { - storageGroups = + databases = getIdentifiers(ctx.identifier()).stream().map(Identifier::getValue).collect(toList()); } - flushStatement.setDatabases(storageGroups); + flushStatement.setDatabases(databases); return new Flush(flushStatement, null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index fc2f9a2cacdf..2a1c21de0405 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -42,6 +42,7 @@ import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.commons.utils.PathUtils; +import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.consensus.ConsensusFactory; @@ -555,6 +556,11 @@ public void syncCloseProcessorsInRegion(List dataRegionIds) { checkResults(tasks, "Failed to sync close processor."); } + public boolean containsDatabase(final String database) { + return dataRegionMap.values().stream() + .anyMatch(dataRegion -> Objects.equals(database, dataRegion.getDatabaseName())); + } + public void syncCloseProcessorsInDatabase(String databaseName, boolean isSeq) { List> tasks = new ArrayList<>(); for (DataRegion dataRegion : dataRegionMap.values()) { @@ -662,22 +668,37 @@ public void recoverRepairData() { } } - public void operateFlush(TFlushReq req) { + public TSStatus operateFlush(final TFlushReq req, final boolean onLocal) { + final StorageEngine storageEngine = StorageEngine.getInstance(); if (req.getRegionIds() != null && !req.getRegionIds().isEmpty()) { - StorageEngine.getInstance().syncCloseProcessorsInRegion(req.getRegionIds()); + storageEngine.syncCloseProcessorsInRegion(req.getRegionIds()); } else if (req.storageGroups == null || req.storageGroups.isEmpty()) { StorageEngine.getInstance().syncCloseAllProcessor(); WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes(); } else { + if (onLocal) { + final List noExistDB = + req.storageGroups.stream() + .filter(database -> !storageEngine.containsDatabase(database)) + .collect(Collectors.toList()); + if (!noExistDB.isEmpty()) { + final StringBuilder sb = new StringBuilder(); + noExistDB.forEach(database -> sb.append(database).append(",")); + return RpcUtils.getStatus( + TSStatusCode.DATABASE_NOT_EXIST, + "Database " + sb.subSequence(0, sb.length() - 1) + " does not exist on local"); + } + } for (String databaseName : req.storageGroups) { if (req.isSeq == null) { - StorageEngine.getInstance().syncCloseProcessorsInDatabase(databaseName); + storageEngine.syncCloseProcessorsInDatabase(databaseName); } else { - StorageEngine.getInstance() - .syncCloseProcessorsInDatabase(databaseName, Boolean.parseBoolean(req.isSeq)); + storageEngine.syncCloseProcessorsInDatabase( + databaseName, Boolean.parseBoolean(req.isSeq)); } } } + return StatusUtils.OK; } public void clearCache() {