diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index c011cd994a22e..3119b04846f94 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.util.List; +import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_DATA_DIRS; + public class MppDataNodeConfig extends MppBaseConfig implements DataNodeConfig { public MppDataNodeConfig() { @@ -80,6 +82,12 @@ public DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS) { return this; } + @Override + public DataNodeConfig setDnDataDirs(String dnDataDirs) { + setProperty(DN_DATA_DIRS, dnDataDirs); + return this; + } + @Override public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( long loadTsFileAnalyzeSchemaMemorySizeInBytes) { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java index bf4189d599b36..4b427b6a3d5d2 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java @@ -42,7 +42,6 @@ import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_DATA_NODE_PROPERTIES; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_CONNECTION_TIMEOUT_MS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_CONSENSUS_DIR; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_DATA_DIRS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_DATA_REGION_CONSENSUS_PORT; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_JOIN_CLUSTER_RETRY_INTERVAL_MS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_METRIC_INTERNAL_REPORTER_TYPE; @@ -117,7 +116,6 @@ public DataNodeWrapper( immutableNodeProperties.setProperty(IoTDBConstant.DN_SEED_CONFIG_NODE, seedConfigNode); immutableNodeProperties.setProperty(DN_SYSTEM_DIR, MppBaseConfig.NULL_VALUE); - immutableNodeProperties.setProperty(DN_DATA_DIRS, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(DN_CONSENSUS_DIR, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(DN_WAL_DIRS, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(DN_TRACING_DIR, MppBaseConfig.NULL_VALUE); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index 46947add5963f..a006280bb4bd0 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -48,6 +48,11 @@ public DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS) { return this; } + @Override + public DataNodeConfig setDnDataDirs(String dnDataDirs) { + return this; + } + @Override public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( long loadTsFileAnalyzeSchemaMemorySizeInBytes) { diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index bede2b001f709..3520897b0db56 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -112,6 +112,11 @@ default Connection getConnection(Constant.Version version) throws SQLException { return getConnection(version, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD); } + default Connection getConnection(DataNodeWrapper dataNodeWrapper) throws SQLException { + return getConnection( + dataNodeWrapper, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD); + } + Connection getConnection(Constant.Version version, String username, String password) throws SQLException; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index 9e4c62022bd6f..dd64fb14a5e27 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -33,6 +33,8 @@ public interface DataNodeConfig { DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS); + DataNodeConfig setDnDataDirs(String dnDataDirs); + DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( long loadTsFileAnalyzeSchemaMemorySizeInBytes); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java new file mode 100644 index 0000000000000..8cae770b87263 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; + +import org.apache.tsfile.utils.Pair; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes; +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.MULTI_DATA_DIRS; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitModsVisibleOnReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitRegionReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileResourceVisibleOnReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileVisibleOnReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.getReplicaDataNodeIds; + +/** + * Tree-model coverage for IoTConsensus region migration over multiple data dirs: a deletion (mods) + * must survive the snapshot transfer to the migrated peer. With several data dirs the snapshot + * fragments of one TsFile can be received into different folders, so the receiver groups companion + * files and the loader relinks them into one data dir; if that breaks, the migrated replica loses + * the deletion. + */ +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBRegionMigrateWithDeletionMultiDataDirIT { + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(2) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + EnvFactory.getEnv().getConfig().getDataNodeConfig().setDnDataDirs(MULTI_DATA_DIRS); + EnvFactory.getEnv().initClusterEnvironment(1, 3); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testRegionMigratePreservesDeletionWithMultiDataDirs() throws Exception { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE root.db"); + statement.execute( + "INSERT INTO root.db.d1(timestamp, s1) VALUES (100, 100), (200, 200), (300, 300)"); + statement.execute("FLUSH"); + + Map>> dataRegionMapWithLeader = + getDataRegionMapWithLeader(statement); + int dataRegionIdForTest = + dataRegionMapWithLeader.keySet().stream() + .max(Integer::compareTo) + .orElseThrow(() -> new AssertionError("No DataRegion found")); + Set initialReplicaDataNodeIds = + getReplicaDataNodeIds(statement, dataRegionIdForTest); + + awaitTsFileVisibleOnReplicas("root.db", dataRegionIdForTest, initialReplicaDataNodeIds); + awaitTsFileResourceVisibleOnReplicas( + statement, "root.db", dataRegionIdForTest, initialReplicaDataNodeIds); + + statement.execute("DELETE FROM root.db.d1.s1 WHERE time <= 200"); + statement.execute("FLUSH"); + awaitModsVisibleOnReplicas("root.db", dataRegionIdForTest, initialReplicaDataNodeIds); + assertDeletionVisibleOnAllReplicas(dataRegionIdForTest, 1); + + Pair> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest); + Set allDataNodes = getAllDataNodes(statement); + int leaderId = leaderAndNodes.getLeft(); + int followerId = + leaderAndNodes.getRight().stream() + .filter(id -> id != leaderId) + .findFirst() + .orElseThrow(() -> new AssertionError("No follower DataNode found")); + int destDataNodeId = + allDataNodes.stream() + .filter(id -> id != leaderId && id != followerId) + .findFirst() + .orElseThrow(() -> new AssertionError("No destination DataNode found")); + + statement.execute( + String.format( + "migrate region %d from %d to %d", dataRegionIdForTest, leaderId, destDataNodeId)); + + Set migratedReplicaDataNodeIds = new HashSet<>(); + migratedReplicaDataNodeIds.add(followerId); + migratedReplicaDataNodeIds.add(destDataNodeId); + awaitRegionReplicas(statement, dataRegionIdForTest, migratedReplicaDataNodeIds); + awaitModsVisibleOnReplicas( + "root.db", dataRegionIdForTest, Collections.singleton(destDataNodeId)); + + assertDeletionVisibleOnAllReplicas(dataRegionIdForTest, 1); + } + } + + private void assertDeletionVisibleOnAllReplicas(int dataRegionId, int expectedCount) + throws Exception { + Set replicaDataNodeIds; + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + replicaDataNodeIds = getReplicaDataNodeIds(statement, dataRegionId); + } + for (int dataNodeId : replicaDataNodeIds) { + DataNodeWrapper dataNodeWrapper = + EnvFactory.getEnv() + .dataNodeIdToWrapper(dataNodeId) + .orElseThrow(() -> new AssertionError("DataNode " + dataNodeId + " not found")); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .pollDelay(500, TimeUnit.MILLISECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> assertDeletionVisibleOnReplica(dataNodeWrapper, expectedCount)); + } + } + + private void assertDeletionVisibleOnReplica(DataNodeWrapper dataNodeWrapper, int expectedCount) + throws Exception { + try (Connection connection = EnvFactory.getEnv().getConnection(dataNodeWrapper); + Statement dataNodeStatement = connection.createStatement()) { + try (ResultSet countResultSet = + dataNodeStatement.executeQuery("SELECT COUNT(s1) FROM root.db.d1")) { + Assert.assertTrue(countResultSet.next()); + Assert.assertEquals(expectedCount, countResultSet.getLong(1)); + } + try (ResultSet deletedRangeResultSet = + dataNodeStatement.executeQuery("SELECT s1 FROM root.db.d1 WHERE time <= 200")) { + Assert.assertFalse(deletedRangeResultSet.next()); + } + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/RegionMigrateFileAssertions.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/RegionMigrateFileAssertions.java new file mode 100644 index 0000000000000..e125cbeda71de --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/RegionMigrateFileAssertions.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1; + +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; + +import org.awaitility.Awaitility; +import org.junit.Assert; + +import java.io.File; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +final class RegionMigrateFileAssertions { + + static final String MULTI_DATA_DIRS = + "data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2"; + + private static final String SEQUENCE_FOLDER = "sequence"; + private static final String TSFILE_SUFFIX = ".tsfile"; + private static final String TSFILE_RESOURCE_SUFFIX = ".tsfile.resource"; + private static final String MODS_SUFFIX = ModificationFile.FILE_SUFFIX; + + private RegionMigrateFileAssertions() {} + + static void awaitTsFileVisibleOnReplicas( + String database, int dataRegionId, Set dataNodeIds) { + awaitFileVisibleOnReplicas(database, dataRegionId, dataNodeIds, TSFILE_SUFFIX); + } + + static void awaitTsFileResourceVisibleOnReplicas( + String database, int dataRegionId, Set dataNodeIds) { + awaitFileVisibleOnReplicas(database, dataRegionId, dataNodeIds, TSFILE_RESOURCE_SUFFIX); + } + + static void awaitTsFileResourceVisibleOnReplicas( + Statement statement, String database, int dataRegionId, Set dataNodeIds) { + awaitFileVisibleOnReplicas( + statement, database, dataRegionId, dataNodeIds, TSFILE_RESOURCE_SUFFIX); + } + + static void awaitModsVisibleOnReplicas( + String database, int dataRegionId, Set dataNodeIds) { + awaitFileVisibleOnReplicas(database, dataRegionId, dataNodeIds, MODS_SUFFIX); + } + + static void awaitRegionReplicas( + Statement statement, int dataRegionId, Set expectedReplicaDataNodeIds) { + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollDelay(1, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .untilAsserted( + () -> + Assert.assertEquals( + expectedReplicaDataNodeIds, getReplicaDataNodeIds(statement, dataRegionId))); + } + + static Set getReplicaDataNodeIds(Statement statement, int dataRegionId) + throws Exception { + Set replicaDataNodeIds = new HashSet<>(); + try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { + while (showRegions.next()) { + if ("DataRegion".equals(showRegions.getString("Type")) + && showRegions.getInt("RegionId") == dataRegionId) { + replicaDataNodeIds.add(showRegions.getInt("DataNodeId")); + } + } + } + Assert.assertFalse(replicaDataNodeIds.isEmpty()); + return replicaDataNodeIds; + } + + private static void awaitFileVisibleOnReplicas( + String database, int dataRegionId, Set dataNodeIds, String suffix) { + awaitFileVisibleOnReplicas(null, database, dataRegionId, dataNodeIds, suffix); + } + + private static void awaitFileVisibleOnReplicas( + Statement flushStatement, + String database, + int dataRegionId, + Set dataNodeIds, + String suffix) { + for (int dataNodeId : dataNodeIds) { + DataNodeWrapper dataNodeWrapper = + EnvFactory.getEnv() + .dataNodeIdToWrapper(dataNodeId) + .orElseThrow(() -> new AssertionError("DataNode " + dataNodeId + " not found")); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .pollDelay(500, TimeUnit.MILLISECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted( + () -> { + if (flushStatement != null) { + flushStatement.execute("FLUSH"); + } + Assert.assertTrue( + String.format( + "Expected file with suffix %s for database %s region %d on DataNode %d", + suffix, database, dataRegionId, dataNodeId), + containsSequenceFileWithSuffix( + dataNodeWrapper, database, dataRegionId, suffix)); + }); + } + } + + private static boolean containsSequenceFileWithSuffix( + DataNodeWrapper dataNodeWrapper, String database, int dataRegionId, String suffix) { + for (String dataDir : MULTI_DATA_DIRS.split(",")) { + File regionDir = + new File( + dataNodeWrapper.getNodePath(), + dataDir + + File.separator + + SEQUENCE_FOLDER + + File.separator + + database + + File.separator + + dataRegionId); + if (containsFileWithSuffix(regionDir, suffix)) { + return true; + } + } + return false; + } + + private static boolean containsFileWithSuffix(File file, String suffix) { + if (!file.exists()) { + return false; + } + if (file.isFile()) { + // IoTConsensus followers can create an open zero-byte TsFile before a later FLUSH closes it. + return file.getName().endsWith(suffix) && (TSFILE_SUFFIX.equals(suffix) || file.length() > 0); + } + File[] children = file.listFiles(); + if (children == null) { + return false; + } + for (File child : children) { + if (containsFileWithSuffix(child, suffix)) { + return true; + } + } + return false; + } +}