Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ public static State convert(ClusterStatusProtos.RegionState.State protoState) {
private final RegionInfo hri;
private final ServerName serverName;
private final State state;
// The duration of region in transition
private long ritDuration;

public static RegionState createForTesting(RegionInfo region, State state) {
return new RegionState(region, state, EnvironmentEdgeManager.currentTime(), null);
Expand All @@ -195,16 +193,10 @@ public RegionState(RegionInfo region, State state, ServerName serverName) {
}

public RegionState(RegionInfo region, State state, long stamp, ServerName serverName) {
this(region, state, stamp, serverName, 0);
}

public RegionState(RegionInfo region, State state, long stamp, ServerName serverName,
long ritDuration) {
this.hri = region;
this.state = state;
this.stamp = stamp;
this.serverName = serverName;
this.ritDuration = ritDuration;
}

public State getState() {
Expand All @@ -223,19 +215,6 @@ public ServerName getServerName() {
return serverName;
}

public long getRitDuration() {
return ritDuration;
}

/**
* Update the duration of region in transition
* @param previousStamp previous RegionState's timestamp
*/
@InterfaceAudience.Private
void updateRitDuration(long previousStamp) {
this.ritDuration += (this.stamp - previousStamp);
}

public boolean isClosing() {
return state == State.CLOSING;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL);
forceRegionRetainmentRetries =
conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES);

this.setRitDurationConsumer();
}

private void mirrorMetaLocations() throws IOException, KeeperException {
Expand Down Expand Up @@ -759,6 +761,10 @@ private List<RegionInfo> getSystemTables(ServerName serverName) {
return serverNode.getSystemRegionInfoList();
}

private void setRitDurationConsumer() {
regionInTransitionTracker.setRitDurationConsumer(metrics::updateRitDuration);
}

private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
throws HBaseIOException {
if (regionNode.getProcedure() != null) {
Expand Down Expand Up @@ -1743,7 +1749,7 @@ private void update(final Collection<RegionState> regions, final long currentTim
ritsOverThreshold = new HashMap<String, RegionState>();
}
ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2L)) ? 1 : 0;
}
if (oldestRITTime < ritTime) {
oldestRITTime = ritTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Consumer;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,8 +46,11 @@ public class RegionInTransitionTracker {

private final ConcurrentSkipListMap<RegionInfo, RegionStateNode> regionInTransition =
new ConcurrentSkipListMap<>(RegionInfo.COMPARATOR);
private final ConcurrentHashMap<RegionInfo, Long> regionEnterTimestamp =
new ConcurrentHashMap<>();

private TableStateManager tableStateManager;
private Consumer<Long> ritDurationConsumer;

public boolean isRegionInTransition(final RegionInfo regionInfo) {
return regionInTransition.containsKey(regionInfo);
Expand Down Expand Up @@ -131,15 +137,29 @@ public void handleRegionDelete(RegionInfo regionInfo) {
}

private boolean addRegionInTransition(final RegionStateNode regionStateNode) {
return regionInTransition.putIfAbsent(regionStateNode.getRegionInfo(), regionStateNode) == null;
boolean added =
regionInTransition.putIfAbsent(regionStateNode.getRegionInfo(), regionStateNode) == null;
if (added) {
regionEnterTimestamp.putIfAbsent(regionStateNode.getRegionInfo(),
EnvironmentEdgeManager.currentTime());
}
return added;
}

private boolean removeRegionInTransition(final RegionInfo regionInfo) {
return regionInTransition.remove(regionInfo) != null;
boolean removed = regionInTransition.remove(regionInfo) != null;
if (removed) {
Long enter = regionEnterTimestamp.remove(regionInfo);
if (enter != null && ritDurationConsumer != null) {
ritDurationConsumer.accept(EnvironmentEdgeManager.currentTime() - enter);
}
}
return removed;
}

public void stop() {
regionInTransition.clear();
regionEnterTimestamp.clear();
}

public boolean hasRegionsInTransition() {
Expand All @@ -153,4 +173,8 @@ public List<RegionStateNode> getRegionsInTransition() {
public void setTableStateManager(TableStateManager tableStateManager) {
this.tableStateManager = tableStateManager;
}

public void setRitDurationConsumer(Consumer<Long> ritDurationConsumer) {
this.ritDurationConsumer = ritDurationConsumer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.TableDescriptorChecker;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(MasterTests.TAG)
@Tag(MediumTests.TAG)
public class TestAssignmentManagerRitDurationMetrics {

private static final Logger LOG =
LoggerFactory.getLogger(TestAssignmentManagerRitDurationMetrics.class);

private static final MetricsAssertHelper METRICS_HELPER =
CompatibilityFactory.getInstance(MetricsAssertHelper.class);

private static SingleProcessHBaseCluster CLUSTER;
private static HMaster MASTER;
private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final int MSG_INTERVAL = 1000;

private String methodName;

@BeforeAll
public static void startCluster() throws Exception {
LOG.info("Starting cluster");
Configuration conf = TEST_UTIL.getConfiguration();

// Enable sanity check for coprocessor, so that region reopen fails on the RS
conf.setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true);
// set RIT stuck warning threshold to a small value
conf.setInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 20);
// set msgInterval to 1 second
conf.setInt("hbase.regionserver.msginterval", MSG_INTERVAL);
// set tablesOnMaster to none
conf.set("hbase.balancer.tablesOnMaster", "none");
// set client sync wait timeout to 5sec
conf.setInt("hbase.client.sync.wait.timeout.msec", 5000);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 2500);
// set a small interval for updating rit metrics
conf.setInt(AssignmentManager.RIT_CHORE_INTERVAL_MSEC_CONF_KEY, MSG_INTERVAL);
// set a small assign attempts for avoiding assert when retrying. (HBASE-20533)
conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 3);
// keep rs online so it can report the failed opens.
conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);

TEST_UTIL.startMiniCluster(2);
CLUSTER = TEST_UTIL.getHBaseCluster();
MASTER = CLUSTER.getMaster();
// Disable sanity check for coprocessor, so that modify table runs on the HMaster
MASTER.getConfiguration().setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, false);
}

@AfterAll
public static void after() throws Exception {
LOG.info("AFTER {} <= IS THIS NULL?", TEST_UTIL);
TEST_UTIL.shutdownMiniCluster();
}

@BeforeEach
public void setUp(TestInfo testInfo) throws Exception {
methodName = testInfo.getTestMethod().get().getName();
}

@Test
public void testRitDurationHistogramMetric() throws Exception {
final TableName tableName = TableName.valueOf(methodName);
final byte[] family = Bytes.toBytes("family");
try (Table table = TEST_UTIL.createTable(tableName, family)) {
final byte[] row = Bytes.toBytes("row");
final byte[] qualifier = Bytes.toBytes("qualifier");
final byte[] value = Bytes.toBytes("value");

Put put = new Put(row);
put.addColumn(family, qualifier, value);
table.put(put);
Thread.sleep(MSG_INTERVAL * 2);

MetricsAssignmentManagerSource amSource =
MASTER.getAssignmentManager().getAssignmentManagerMetrics().getMetricsProcSource();
long ritDurationNumOps = getRitCountFromRegionStates(amSource);

RegionInfo regionInfo = MASTER.getAssignmentManager().getRegionStates()
.getRegionsOfTable(tableName).iterator().next();
ServerName current =
MASTER.getAssignmentManager().getRegionStates().getRegionServerOfRegion(regionInfo);
ServerName target = MASTER.getServerManager().getOnlineServersList().stream()
.filter(sn -> !sn.equals(current)).findFirst()
.orElseThrow(() -> new IllegalStateException("Need at least two regionservers"));

TEST_UTIL.getAdmin().move(regionInfo.getEncodedNameAsBytes(), target);
TEST_UTIL.waitFor(10_000, () -> getRitCountFromRegionStates(amSource) > ritDurationNumOps);
TEST_UTIL.waitUntilNoRegionTransitScheduled();
Thread.sleep(MSG_INTERVAL * 5);
assertEquals(ritDurationNumOps + 1, getRitCountFromRegionStates(amSource));
}
}

private long getRitCountFromRegionStates(MetricsAssignmentManagerSource amSource) {
return METRICS_HELPER.getCounter("ritDurationNumOps", amSource);
}
}
Loading