Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -106,4 +107,21 @@ default String getVersion() {
* rounded to MB
*/
Map<String, Integer> getRegionCachedInfo();

/**
* The available cache space on this region server (bytes), if reported in the server load.
*/
default long getCacheFreeSize() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ServerMetrics#getCacheFreeSize() defaults to 0L, and getRegionColdDataSize() defaults to empty map. That is safe for compatibility, but it means missing metrics are indistinguishable from “no free cache” or “no cold data.” Should missing cache-free-size be represented differently from actual zero? Would 0L make old / partially upgraded servers look like they have no free cache and bias balancing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, since there would be no mean to determine the free cache space on those, it would exclude those as potential candidates, reducing the odds of finding a better assignment plan, so it may perform less moves, or worst case, no moves at all.

return 0L;
}

/**
* Returns the region cold data information for the regions hosted on this server. Here, cold data
* refers only to region data that is classified as cold by the DataTieringManager according to
* the configured priority logic. These data should be kept out of block cache.
* @return map of region encoded name and its total cold data size, rounded to MB
*/
default Map<String, Integer> getRegionColdDataSize() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static ServerMetrics toServerMetrics(ServerName serverName,

public static ServerMetrics toServerMetrics(ServerName serverName, int versionNumber,
String version, ClusterStatusProtos.ServerLoad serverLoadPB) {
return ServerMetricsBuilder.newBuilder(serverName)
ServerMetricsBuilder builder = ServerMetricsBuilder.newBuilder(serverName)
.setRequestCountPerSecond(serverLoadPB.getNumberOfRequests())
.setRequestCount(serverLoadPB.getTotalNumberOfRequests())
.setInfoServerPort(serverLoadPB.getInfoServerPort())
Expand All @@ -86,7 +86,10 @@ public static ServerMetrics toServerMetrics(ServerName serverName, int versionNu
.setRegionCachedInfo(serverLoadPB.getRegionCachedInfoMap())
.setReportTimestamp(serverLoadPB.getReportEndTime())
.setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber)
.setVersion(version).build();
.setVersion(version)
.setCacheFreeSize(serverLoadPB.hasCacheFreeSize() ? serverLoadPB.getCacheFreeSize() : 0L)
.setRegionColdDataInfo(serverLoadPB.getRegionColdDataMap());
return builder.build();
}

public static List<HBaseProtos.Coprocessor> toCoprocessor(Collection<String> names) {
Expand Down Expand Up @@ -116,6 +119,7 @@ public static ClusterStatusProtos.ServerLoad toServerLoad(ServerMetrics metrics)
if (metrics.getReplicationLoadSink() != null) {
builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(metrics.getReplicationLoadSink()));
}
builder.setCacheFreeSize(metrics.getCacheFreeSize());
return builder.build();
}

Expand All @@ -141,6 +145,8 @@ public static ServerMetricsBuilder newBuilder(ServerName sn) {
private long lastReportTimestamp = 0;
private final List<ServerTask> tasks = new ArrayList<>();
private Map<String, Integer> regionCachedInfo = new HashMap<>();
private long cacheFreeSize;
private Map<String, Integer> regionColdDataInfo = Collections.emptyMap();

private ServerMetricsBuilder(ServerName serverName) {
this.serverName = serverName;
Expand Down Expand Up @@ -226,10 +232,21 @@ public ServerMetricsBuilder setRegionCachedInfo(Map<String, Integer> value) {
return this;
}

public ServerMetricsBuilder setCacheFreeSize(long blockCacheFreeSize) {
this.cacheFreeSize = blockCacheFreeSize;
return this;
}

public ServerMetricsBuilder setRegionColdDataInfo(Map<String, Integer> regionColdDataInfo) {
this.regionColdDataInfo = regionColdDataInfo;
return this;
}

public ServerMetrics build() {
return new ServerMetricsImpl(serverName, versionNumber, version, requestCountPerSecond,
requestCount, usedHeapSize, maxHeapSize, infoServerPort, sources, sink, regionStatus,
coprocessorNames, reportTimestamp, lastReportTimestamp, userMetrics, tasks, regionCachedInfo);
coprocessorNames, reportTimestamp, lastReportTimestamp, userMetrics, tasks, regionCachedInfo,
cacheFreeSize, regionColdDataInfo);
}

private static class ServerMetricsImpl implements ServerMetrics {
Expand All @@ -251,13 +268,16 @@ private static class ServerMetricsImpl implements ServerMetrics {
private final Map<byte[], UserMetrics> userMetrics;
private final List<ServerTask> tasks;
private final Map<String, Integer> regionCachedInfo;
private final long cacheFreeSize;
private final Map<String, Integer> regionColdDataInfo;

ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize,
int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics, List<ServerTask> tasks,
Map<String, Integer> regionCachedInfo) {
Map<String, Integer> regionCachedInfo, long cacheFreeSize,
Map<String, Integer> regionColdDataInfo) {
this.serverName = Preconditions.checkNotNull(serverName);
this.versionNumber = versionNumber;
this.version = version;
Expand All @@ -275,6 +295,8 @@ private static class ServerMetricsImpl implements ServerMetrics {
this.lastReportTimestamp = lastReportTimestamp;
this.tasks = tasks;
this.regionCachedInfo = regionCachedInfo;
this.cacheFreeSize = cacheFreeSize;
this.regionColdDataInfo = regionColdDataInfo;
}

@Override
Expand Down Expand Up @@ -372,6 +394,17 @@ public Map<String, Integer> getRegionCachedInfo() {
return Collections.unmodifiableMap(regionCachedInfo);
}

@Override
public long getCacheFreeSize() {
return cacheFreeSize;
}

@Override
public Map<String, Integer> getRegionColdDataSize() {
return Collections
.unmodifiableMap(regionColdDataInfo != null ? regionColdDataInfo : Collections.emptyMap());
}

@Override
public String toString() {
int storeCount = 0;
Expand Down
18 changes: 13 additions & 5 deletions hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,23 @@ message ServerLoad {
*/
repeated UserLoad userLoads = 12;

/**
* The metrics for region cached on this region server
*/
map<string, uint32> regionCachedInfo = 13;

/**
* The active monitored tasks
*/
repeated ServerTask tasks = 15; /* 15 here to stay in sync with master branch */

map<string, uint32> regionCachedInfo = 16;

/**
* Unallocated block cache capacity on this RegionServer, in bytes.
* Used by the master for cache-aware load balancing (optional).
*/
optional uint64 cacheFreeSize = 17;

/**
* The metrics for total region cold data size
*/
map<string, uint32> regionColdData = 18;
}

message LiveServerInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class BalancerClusterState {
private int[] regionServerIndexWithBestRegionCachedRatio;
// Maps regionName -> oldServerName -> cache ratio of the region on the old server
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
// cache free space available on each server, aligned to the "servers" array indices;
long[] serverBlockCacheFreeSize;

private final Supplier<List<Integer>> shuffledServerIndicesSupplier =
Suppliers.memoizeWithExpiration(() -> {
Expand All @@ -148,20 +150,23 @@ public String getRack(ServerName server) {
BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder,
RackManager rackManager) {
this(null, clusterState, loads, regionFinder, rackManager, null);
this(null, clusterState, loads, regionFinder, rackManager, null, null);
}

protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder,
RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio);
RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio,
Map<ServerName, Long> serverBlockCacheFreeByServer) {
this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio,
serverBlockCacheFreeByServer);
}

@SuppressWarnings("unchecked")
BalancerClusterState(Collection<RegionInfo> unassignedRegions,
Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
RegionLocationFinder regionFinder, RackManager rackManager,
Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio,
Map<ServerName, Long> serverBlockCacheFreeByServer) {
if (unassignedRegions == null) {
unassignedRegions = Collections.emptyList();
}
Expand Down Expand Up @@ -385,6 +390,15 @@ protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
serversPerRack);
}

this.serverBlockCacheFreeSize = new long[numServers];
if (serverBlockCacheFreeByServer != null) {
for (int i = 0; i < numServers; i++) {
ServerName sn = servers[i];
this.serverBlockCacheFreeSize[i] =
sn == null ? 0L : serverBlockCacheFreeByServer.getOrDefault(sn, 0L);
}
}
}

private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
Expand Down Expand Up @@ -705,6 +719,18 @@ public int[] getOrComputeServerWithBestRegionCachedRatio() {
return regionServerIndexWithBestRegionCachedRatio;
}

/**
* Finds and return the latest reported cache ratio for the region on the RegionServer it's
* currently online.
*/
float getObservedRegionCacheRatio(int region) {
Deque<BalancerRegionLoad> dq = regionLoads[region];
if (dq == null || dq.isEmpty()) {
return 0.0f;
}
return dq.getLast().getCurrentRegionCacheRatio();
}

/**
* Maps region index to rack index
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private boolean isViolating(RegionPlan regionPlan) {
private RegionPlanConditional createConditional(Class<? extends RegionPlanConditional> clazz,
BalancerClusterState cluster) {
if (cluster == null) {
cluster = new BalancerClusterState(Collections.emptyMap(), null, null, null, null);
cluster = new BalancerClusterState(Collections.emptyMap(), null, null, null, null, null);
}
try {
Constructor<? extends RegionPlanConditional> ctor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private BalancerClusterState createCluster(List<ServerName> servers,
}
}
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager,
null);
null, null);
}

private List<ServerName> findIdleServers(List<ServerName> servers) {
Expand Down
Loading
Loading