Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
private Long sleepTime;
private Configuration configuration;

public enum GeneratorFunctionType {
LOAD,
CACHE_RATIO
}
private float lowCacheRatioThreshold;
private float potentialCacheRatioAfterMove;
private float minFreeCacheSpaceFactor;

@Override
public void loadConf(Configuration configuration) {
Expand All @@ -101,6 +100,12 @@ public void loadConf(Configuration configuration) {
ratioThreshold =
this.configuration.getFloat(CACHE_RATIO_THRESHOLD, CACHE_RATIO_THRESHOLD_DEFAULT);
sleepTime = configuration.getLong(MOVE_THROTTLING, MOVE_THROTTLING_DEFAULT.toMillis());
lowCacheRatioThreshold = configuration.getFloat(LOW_CACHE_RATIO_FOR_RELOCATION_KEY,
LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT);
potentialCacheRatioAfterMove = configuration.getFloat(POTENTIAL_CACHE_RATIO_AFTER_MOVE_KEY,
POTENTIAL_CACHE_RATIO_AFTER_MOVE_DEFAULT);
minFreeCacheSpaceFactor =
configuration.getFloat(MIN_FREE_CACHE_SPACE_FACTOR_KEY, MIN_FREE_CACHE_SPACE_FACTOR_DEFAULT);
}

@Override
Expand Down Expand Up @@ -310,6 +315,38 @@ protected BalanceAction generate(BalancerClusterState cluster) {
regionCacheRatioOnOldServerMap.remove(regionEncodedName);
return action;
}
return generatePlanForFreeCacheSpace(cluster);
}

private BalanceAction generatePlanForFreeCacheSpace(BalancerClusterState cluster) {
if (cluster.serverBlockCacheFreeSize == null) {
return BalanceAction.NULL_ACTION;
}
for (int region = 0; region < cluster.numRegions; region++) {
RegionInfo regionInfo = cluster.regions[region];
if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
continue;
}
int currentServer = cluster.regionIndexToServerIndex[region];
float ratio = cluster.getObservedRegionCacheRatio(region);
if (ratio >= lowCacheRatioThreshold) {
continue;
}
int regionSizeMb = cluster.getTotalRegionHFileSizeMB(region);
if (regionSizeMb <= 0) {
continue;
}
long bytesNeeded = (long) (regionSizeMb * 1024L * 1024L * minFreeCacheSpaceFactor);
for (int server = 0; server < cluster.numServers; server++) {
// Skips current server for region, as we can't generate a move to same server
if (server == currentServer) {
continue;
}
if (cluster.serverBlockCacheFreeSize[server] >= bytesNeeded) {
return getAction(currentServer, region, server, -1);
}
}
}
return BalanceAction.NULL_ACTION;
}

Expand All @@ -319,7 +356,7 @@ private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex
return moveRegionToOldServer(cluster, regionIndex, currentServerIndex,
cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer)
? getAction(currentServerIndex, regionIndex, oldServerIndex, -1)
: BalanceAction.NULL_ACTION;
: generatePlanForFreeCacheSpace(cluster);
}

private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -129,6 +130,74 @@ public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception {
assertEquals(5, targetServers.get(server1).size());
}

/**
* Regions on the overloaded RS report low block-cache ratio; no RS reports prefetch/historical
* cache for those regions (so {@link CacheAwareLoadBalancer.CacheAwareCandidateGenerator} has no
* "old server" to prefer). Another RS has ample free block cache. The balancer should still emit
* plans that shed load from the hot RS onto the idle RS with spare cache capacity.
*/
@Test
public void testLowCacheRatioNoHistoricalCacheRelocatesWhenTargetHasFreeBlockCache()
throws Exception {
Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
ServerName server0 = servers.get(0);
ServerName server1 = servers.get(1);
ServerName server2 = servers.get(2);

List<RegionInfo> regionsOnServer0 = randomRegions(10);
List<RegionInfo> regionsOnServer1 = randomRegions(0);
List<RegionInfo> regionsOnServer2 = randomRegions(5);

clusterState.put(server0, regionsOnServer0);
clusterState.put(server1, regionsOnServer1);
clusterState.put(server2, regionsOnServer2);

// Below LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT (0.35);
ServerMetrics sm0 = mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 0.1f,
new ArrayList<>(), 0, 10);
when(sm0.getCacheFreeSize()).thenReturn(0L);
ServerMetrics sm1 = mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 0.0f,
new ArrayList<>(), 0, 10);
// Simulates 1GB free cache space on server1
when(sm1.getCacheFreeSize()).thenReturn(1024L * 1024 * 1024);
ServerMetrics sm2 = mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 1.0f,
new ArrayList<>(), 0, 10);
when(sm2.getCacheFreeSize()).thenReturn(0L);

Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
serverMetricsMap.put(server0, sm0);
serverMetricsMap.put(server1, sm1);
serverMetricsMap.put(server2, sm2);
ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
loadBalancer.updateClusterMetrics(clusterMetrics);

CacheAwareLoadBalancer internalBalancer =
(CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
assertNotNull(internalBalancer);
assertTrue(internalBalancer.regionCacheRatioOnOldServerMap.isEmpty());

Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable =
(Map) mockClusterServersWithTables(clusterState);
List<RegionPlan> plans = loadBalancer.balanceCluster(loadOfAllTable);
assertNotNull(plans);

Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
for (RegionPlan plan : plans) {
if (plan.getSource().equals(server0)) {
regionsMovedFromServer0.add(plan.getRegionInfo());
if (!targetServers.containsKey(plan.getDestination())) {
targetServers.put(plan.getDestination(), new ArrayList<>());
}
targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
}
}
assertEquals(5, regionsMovedFromServer0.size());
assertNotNull(targetServers.get(server1));
assertEquals(5, targetServers.get(server1).size());
}

@Test
public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception {
// The regions are partially cached on old server but not cached on the current server
Expand Down