Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ public void start() {
"-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize() + "m",
"-Djdk.nio.maxCachedBufferSize=262144",
"-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" + killPoints.toString(),
"-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8",
"-Dsun.jnu.encoding=UTF-8",
"-Dfile.encoding=UTF-8",
"-cp",
server_node_lib_path));
addStartCmdParams(startCmd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ public void testFlushNotExistGroupNoData() {
sqe.printStackTrace();
assertTrue(sqe.getMessage().contains(expectedMsg));
}
try {
statement.execute(
"FLUSH root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2 on local");
} catch (SQLException sqe) {
String expectedMsg =
"500: Database root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2 does not exist on local";
sqe.printStackTrace();
assertTrue(sqe.getMessage().contains(expectedMsg));
}
} catch (Exception e) {
fail(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,11 +981,11 @@ public TSStatus merge() throws TException {
@Override
public TSStatus flush(final TFlushReq req) throws TException {
if (req.storageGroups != null) {
final List<String> noExistSg =
final List<String> noExistDB =
configManager.getPartitionManager().filterUnExistDatabases(req.storageGroups);
if (!noExistSg.isEmpty()) {
if (!noExistDB.isEmpty()) {
final StringBuilder sb = new StringBuilder();
noExistSg.forEach(storageGroup -> sb.append(storageGroup).append(","));
noExistDB.forEach(database -> sb.append(database).append(","));
return RpcUtils.getStatus(
TSStatusCode.DATABASE_NOT_EXIST,
"Database " + sb.subSequence(0, sb.length() - 1) + " does not exist");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2433,7 +2433,7 @@ public TSStatus stopRepairData() throws TException {
@Override
public TSStatus flush(TFlushReq req) throws TException {
try {
storageEngine.operateFlush(req);
storageEngine.operateFlush(req, false);
} catch (Exception e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1264,8 +1264,7 @@
}
} else {
try {
StorageEngine.getInstance().operateFlush(tFlushReq);
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
tsStatus = StorageEngine.getInstance().operateFlush(tFlushReq, true);
} catch (final Exception e) {
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
Expand Down Expand Up @@ -2309,7 +2308,7 @@
}

@Override
public SettableFuture<ConfigTaskResult> alterPipe(final AlterPipeStatement alterPipeStatement) {

Check warning on line 2311 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 178 to 64, Complexity from 21 to 14, Nesting Level from 4 to 2, Number of Variables from 22 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0oaF8NGCUUzfFYwnE_&open=AZ0oaF8NGCUUzfFYwnE_&pullRequest=17365
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();

// Validate pipe name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1574,17 +1574,17 @@
@Override
public Node visitFlushStatement(final RelationalSqlParser.FlushStatementContext ctx) {
final FlushStatement flushStatement = new FlushStatement(StatementType.FLUSH);
List<String> storageGroups = null;
List<String> databases = null;
if (ctx.booleanValue() != null) {
flushStatement.setSeq(Boolean.parseBoolean(ctx.booleanValue().getText()));
}
flushStatement.setOnCluster(
ctx.localOrClusterMode() == null || ctx.localOrClusterMode().LOCAL() == null);
if (ctx.identifier() != null) {
storageGroups =
databases =
getIdentifiers(ctx.identifier()).stream().map(Identifier::getValue).collect(toList());
}
flushStatement.setDatabases(storageGroups);
flushStatement.setDatabases(databases);
return new Flush(flushStatement, null);
}

Expand Down Expand Up @@ -1978,7 +1978,7 @@
}

@Override
public Node visitGrantStatement(RelationalSqlParser.GrantStatementContext ctx) {

Check warning on line 1981 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 81 to 64, Complexity from 29 to 14, Nesting Level from 4 to 2, Number of Variables from 10 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0oaF9wGCUUzfFYwnFA&open=AZ0oaF9wGCUUzfFYwnFA&pullRequest=17365
boolean toUser;
String name;
toUser = ctx.holderType().getText().equalsIgnoreCase("user");
Expand Down Expand Up @@ -2063,7 +2063,7 @@
throw new SemanticException("author statement parser error");
}

public Node visitRevokeStatement(RelationalSqlParser.RevokeStatementContext ctx) {

Check warning on line 2066 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 89 to 64, Complexity from 27 to 14, Nesting Level from 4 to 2, Number of Variables from 9 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0oaF9wGCUUzfFYwnFB&open=AZ0oaF9wGCUUzfFYwnFB&pullRequest=17365
boolean fromUser;
String name;
fromUser = ctx.holderType().getText().equalsIgnoreCase("user");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
Expand Down Expand Up @@ -555,6 +556,11 @@
checkResults(tasks, "Failed to sync close processor.");
}

public boolean containsDatabase(final String database) {
return dataRegionMap.values().stream()
.anyMatch(dataRegion -> Objects.equals(database, dataRegion.getDatabaseName()));
}

public void syncCloseProcessorsInDatabase(String databaseName, boolean isSeq) {
List<Future<Void>> tasks = new ArrayList<>();
for (DataRegion dataRegion : dataRegionMap.values()) {
Expand Down Expand Up @@ -662,22 +668,37 @@
}
}

public void operateFlush(TFlushReq req) {
public TSStatus operateFlush(final TFlushReq req, final boolean onLocal) {

Check failure on line 671 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0oaFzAGCUUzfFYwnE-&open=AZ0oaFzAGCUUzfFYwnE-&pullRequest=17365
final StorageEngine storageEngine = StorageEngine.getInstance();
if (req.getRegionIds() != null && !req.getRegionIds().isEmpty()) {
StorageEngine.getInstance().syncCloseProcessorsInRegion(req.getRegionIds());
storageEngine.syncCloseProcessorsInRegion(req.getRegionIds());
} else if (req.storageGroups == null || req.storageGroups.isEmpty()) {
StorageEngine.getInstance().syncCloseAllProcessor();
WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
} else {
if (onLocal) {
final List<String> noExistDB =
req.storageGroups.stream()
.filter(database -> !storageEngine.containsDatabase(database))
.collect(Collectors.toList());
if (!noExistDB.isEmpty()) {
final StringBuilder sb = new StringBuilder();
noExistDB.forEach(database -> sb.append(database).append(","));
return RpcUtils.getStatus(
TSStatusCode.DATABASE_NOT_EXIST,
"Database " + sb.subSequence(0, sb.length() - 1) + " does not exist on local");
}
}
for (String databaseName : req.storageGroups) {
if (req.isSeq == null) {
StorageEngine.getInstance().syncCloseProcessorsInDatabase(databaseName);
storageEngine.syncCloseProcessorsInDatabase(databaseName);
} else {
StorageEngine.getInstance()
.syncCloseProcessorsInDatabase(databaseName, Boolean.parseBoolean(req.isSeq));
storageEngine.syncCloseProcessorsInDatabase(
databaseName, Boolean.parseBoolean(req.isSeq));
}
}
}
return StatusUtils.OK;
}

public void clearCache() {
Expand Down
Loading