Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -51,6 +52,7 @@
@RunWith(IoTDBTestRunner.class)
public class IoTDBMigrateMultiRegionForIoTV1IT extends IoTDBRegionOperationReliabilityITFramework {
private static final String MULTI_REGION_MIGRATE_FORMAT = "migrate region %s from %d to %d";
private static final String EXPAND_FORMAT = "extend region %d to %d";

private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBMigrateMultiRegionForIoTV1IT.class);
Expand Down Expand Up @@ -83,6 +85,10 @@ public void multiRegionMigrateTest() throws Exception {
Map<Integer, Set<Integer>> regionMap = getAllRegionMap(statement);
Set<Integer> allDataNodeId = getAllDataNodes(statement);

// INSERTION1 only creates one schema region and one data region; with replication factor 1
// they are often placed on different DataNodes. Colocate them before multi-region migrate.
regionMap = ensureDataNodeHostsMultipleRegions(statement, client, regionMap);

// With replication factor 1 every region lives on exactly one DataNode. Pick a source
// DataNode that hosts at least two regions, then migrate all its regions to a fresh
// destination DataNode.
Expand Down Expand Up @@ -170,6 +176,71 @@ public void multiRegionMigrateTest() throws Exception {
}
}

private Map<Integer, Set<Integer>> ensureDataNodeHostsMultipleRegions(
Statement statement,
SyncConfigNodeIServiceClient client,
Map<Integer, Set<Integer>> regionMap)
throws Exception {
if (hasDataNodeHostingMultipleRegions(regionMap)) {
return regionMap;
}
List<Integer> regionIds = new ArrayList<>(regionMap.keySet());
Assert.assertTrue("Need at least two regions to colocate", regionIds.size() >= 2);

int firstRegion = regionIds.get(0);
int secondRegion = regionIds.get(1);
int targetDataNode = regionMap.get(secondRegion).iterator().next();
if (regionMap.get(firstRegion).contains(targetDataNode)) {
targetDataNode = regionMap.get(firstRegion).iterator().next();
regionGroupExpand(statement, client, secondRegion, targetDataNode);
} else {
regionGroupExpand(statement, client, firstRegion, targetDataNode);
}
return getAllRegionMap(statement);
}

private boolean hasDataNodeHostingMultipleRegions(Map<Integer, Set<Integer>> regionMap) {
return regionMap.values().stream()
.flatMap(Set::stream)
.collect(Collectors.groupingBy(dataNodeId -> dataNodeId, Collectors.counting()))
.values()
.stream()
.anyMatch(count -> count >= 2);
}

private void regionGroupExpand(
Statement statement,
SyncConfigNodeIServiceClient client,
int selectedRegion,
int targetDataNode)
throws Exception {
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(
() -> {
statement.execute(String.format(EXPAND_FORMAT, selectedRegion, targetDataNode));
return true;
});

Predicate<TShowRegionResp> expandRegionPredicate =
tShowRegionResp -> {
Map<Integer, Set<Integer>> newRegionMap =
getRunningRegionMap(tShowRegionResp.getRegionInfoList());
Set<Integer> dataNodes = newRegionMap.get(selectedRegion);
return dataNodes != null && dataNodes.contains(targetDataNode);
};

awaitUntilSuccess(
client,
selectedRegion,
expandRegionPredicate,
Optional.of(targetDataNode),
Optional.empty());

LOGGER.info("Region {} has expanded to DataNode {}", selectedRegion, targetDataNode);
}

private int selectDataNodeHostingMultipleRegions(Map<Integer, Set<Integer>> regionMap) {
Map<Integer, Long> regionCountPerDataNode =
regionMap.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
package org.apache.iotdb.confignode.it.regionmigration.pass.commit;

import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.EnvFactory;
Expand All @@ -43,12 +43,12 @@

import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.MULTI_DATA_DIRS;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitModsVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitRegionReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileResourceVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.getReplicaDataNodeIds;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.MULTI_DATA_DIRS;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.awaitModsVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.awaitRegionReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.awaitTsFileResourceVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.awaitTsFileVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.getReplicaDataNodeIds;

/**
* Tree-model coverage for IoTConsensus region migration over multiple data dirs: a deletion (mods)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
package org.apache.iotdb.confignode.it.regionmigration.pass.commit;

import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.isession.SessionConfig;
Expand Down Expand Up @@ -45,12 +45,12 @@

import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.MULTI_DATA_DIRS;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitModsVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitRegionReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileResourceVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.getReplicaDataNodeIds;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.MULTI_DATA_DIRS;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.awaitModsVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.awaitRegionReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.awaitTsFileResourceVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.awaitTsFileVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.commit.RegionMigrateFileAssertions.getReplicaDataNodeIds;

/**
* Table-model twin of {@link IoTDBRegionMigrateWithDeletionMultiDataDirIT}: a deletion (mods) must
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
package org.apache.iotdb.confignode.it.regionmigration.pass.commit;

import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
Expand Down
Loading