Skip to content

Commit fd89481

Browse files
authored
HDDS-13890. Datanode supports dynamic configuration of SCM (#9385)
1 parent 9708658 commit fd89481

15 files changed

Lines changed: 923 additions & 12 deletions

File tree

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
3232
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
3333
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
34+
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
3435
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
3536
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
3637
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
@@ -57,10 +58,12 @@
5758
import java.util.Objects;
5859
import java.util.Optional;
5960
import java.util.OptionalInt;
61+
import java.util.Set;
6062
import java.util.TreeMap;
6163
import java.util.UUID;
6264
import javax.management.ObjectName;
6365
import org.apache.commons.lang3.StringUtils;
66+
import org.apache.commons.lang3.tuple.Pair;
6467
import org.apache.hadoop.conf.ConfigRedactor;
6568
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
6669
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -331,6 +334,38 @@ public static Collection<InetSocketAddress> getSCMAddressForDatanodes(
331334
}
332335
}
333336

337+
/**
338+
* Returns the SCM address for datanodes based on the service ID and the SCM addresses.
339+
* @param conf Configuration
340+
* @param scmServiceId SCM service ID
341+
* @param scmNodeIds Requested SCM node IDs
342+
* @return A collection with addresses of the request SCM node IDs.
343+
* Null if there is any wrongly configured SCM address. Note that the returned collection
344+
* might not be ordered the same way as the requested SCM node IDs
345+
*/
346+
public static Collection<Pair<String, InetSocketAddress>> getSCMAddressForDatanodes(
347+
ConfigurationSource conf, String scmServiceId, Set<String> scmNodeIds) {
348+
Collection<Pair<String, InetSocketAddress>> scmNodeAddress = new HashSet<>(scmNodeIds.size());
349+
for (String scmNodeId : scmNodeIds) {
350+
String addressKey = ConfUtils.addKeySuffixes(
351+
OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
352+
String scmAddress = conf.get(addressKey);
353+
if (scmAddress == null) {
354+
LOG.warn("The SCM address configuration {} is not defined, return nothing", addressKey);
355+
return null;
356+
}
357+
358+
int scmDatanodePort = SCMNodeInfo.getPort(conf, scmServiceId, scmNodeId,
359+
OZONE_SCM_DATANODE_ADDRESS_KEY, OZONE_SCM_DATANODE_PORT_KEY,
360+
OZONE_SCM_DATANODE_PORT_DEFAULT);
361+
362+
String scmDatanodeAddressStr = SCMNodeInfo.buildAddress(scmAddress, scmDatanodePort);
363+
InetSocketAddress scmDatanodeAddress = NetUtils.createSocketAddr(scmDatanodeAddressStr);
364+
scmNodeAddress.add(Pair.of(scmNodeId, scmDatanodeAddress));
365+
}
366+
return scmNodeAddress;
367+
}
368+
334369
/**
335370
* Retrieve the socket addresses of recon.
336371
*

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,11 @@ public static List<SCMNodeInfo> buildNodeInfo(ConfigurationSource conf) {
181181

182182
}
183183

184-
private static String buildAddress(String address, int port) {
185-
return new StringBuilder().append(address).append(':')
186-
.append(port).toString();
184+
public static String buildAddress(String address, int port) {
185+
return address + ':' + port;
187186
}
188187

189-
private static int getPort(ConfigurationSource conf,
188+
public static int getPort(ConfigurationSource conf,
190189
String scmServiceId, String scmNodeId, String configKey,
191190
String portKey, int defaultPort) {
192191
String suffixKey = ConfUtils.addKeySuffixes(configKey, scmServiceId,

hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ default String getTrimmed(String key, String defaultValue) {
9898

9999
default String[] getTrimmedStrings(String name) {
100100
String valueString = get(name);
101+
return getTrimmedStringsFromValue(valueString);
102+
}
103+
104+
static String[] getTrimmedStringsFromValue(String valueString) {
101105
if (null == valueString) {
102106
return EMPTY_STRING_ARRAY;
103107
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.HTTP;
2121
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.HTTPS;
22+
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NODES_KEY;
2223
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
2324
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientWithMaxRetry;
2425
import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
@@ -34,23 +35,33 @@
3435

3536
import com.google.common.annotations.VisibleForTesting;
3637
import com.google.common.base.Preconditions;
38+
import com.google.common.collect.Sets;
3739
import java.io.File;
3840
import java.io.IOException;
41+
import java.net.InetSocketAddress;
3942
import java.util.Arrays;
43+
import java.util.Collection;
4044
import java.util.HashMap;
45+
import java.util.HashSet;
4146
import java.util.List;
4247
import java.util.Map;
4348
import java.util.Objects;
49+
import java.util.Set;
4450
import java.util.concurrent.Callable;
4551
import java.util.concurrent.ConcurrentHashMap;
4652
import java.util.concurrent.atomic.AtomicBoolean;
53+
import java.util.stream.Collectors;
54+
import java.util.stream.Stream;
4755
import javax.management.ObjectName;
56+
import org.apache.commons.lang3.StringUtils;
57+
import org.apache.commons.lang3.tuple.Pair;
4858
import org.apache.hadoop.conf.Configurable;
4959
import org.apache.hadoop.hdds.DatanodeVersion;
5060
import org.apache.hadoop.hdds.HddsConfigKeys;
5161
import org.apache.hadoop.hdds.HddsUtils;
5262
import org.apache.hadoop.hdds.cli.GenericCli;
5363
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
64+
import org.apache.hadoop.hdds.conf.ConfigurationSource;
5465
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
5566
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
5667
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -72,6 +83,9 @@
7283
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
7384
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
7485
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
86+
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
87+
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
88+
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
7589
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
7690
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
7791
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -124,6 +138,7 @@ public class HddsDatanodeService extends GenericCli implements Callable<Void>, S
124138
private HddsDatanodeClientProtocolServer clientProtocolServer;
125139
private OzoneAdmins admins;
126140
private ReconfigurationHandler reconfigurationHandler;
141+
private String scmServiceId;
127142

128143
//Constructor for DataNode PluginService
129144
public HddsDatanodeService() { }
@@ -207,6 +222,7 @@ public void start(OzoneConfiguration configuration) {
207222
start();
208223
}
209224

225+
@SuppressWarnings("methodlength")
210226
public void start() {
211227
serviceRuntimeInfo = new DNMXBeanImpl(HddsVersionInfo.HDDS_VERSION_INFO) {
212228
@Override
@@ -294,6 +310,12 @@ public String getNamespace() {
294310
.register(REPLICATION_STREAMS_LIMIT_KEY,
295311
this::reconfigReplicationStreamsLimit);
296312

313+
scmServiceId = HddsUtils.getScmServiceId(conf);
314+
if (scmServiceId != null) {
315+
reconfigurationHandler.register(OZONE_SCM_NODES_KEY + "." + scmServiceId,
316+
this::reconfigScmNodes);
317+
}
318+
297319
reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());
298320

299321
datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails, conf,
@@ -680,6 +702,112 @@ private String reconfigBlockDeletingServiceTimeout(String value) {
680702
return value;
681703
}
682704

705+
/**
706+
* Reconfigure the SCM nodes configuration which will trigger the creation and removal of
707+
* SCM connections based on the difference between the old and the new SCM nodes configuration.
708+
* <p>
709+
* The assumption is that the SCM node address configurations exists for all the involved node IDs
710+
* This is because reconfiguration can only support one configuration field at a time
711+
* @param value The new configuration value for "ozone.scm.nodes.SERVICEID"
712+
* @return new configuration for "ozone.scm.nodes.SERVICEID" which reflects the SCMs that the datanode has
713+
* is not connected to.
714+
*/
715+
private String reconfigScmNodes(String value) {
716+
if (StringUtils.isBlank(value)) {
717+
throw new IllegalArgumentException("Reconfiguration failed since setting the empty SCM nodes " +
718+
"configuration is not allowed");
719+
}
720+
Set<String> previousNodeIds = new HashSet<>(HddsUtils.getSCMNodeIds(getConf(), scmServiceId));
721+
Set<String> newScmNodeIds = Stream.of(ConfigurationSource.getTrimmedStringsFromValue(value))
722+
.collect(Collectors.toSet());
723+
724+
if (newScmNodeIds.isEmpty()) {
725+
throw new IllegalArgumentException("Reconfiguration failed since setting the empty SCM nodes " +
726+
"configuration is not allowed");
727+
}
728+
729+
Set<String> scmNodesIdsToAdd = Sets.difference(newScmNodeIds, previousNodeIds);
730+
Set<String> scmNodesIdsToRemove = Sets.difference(previousNodeIds, newScmNodeIds);
731+
732+
// We should only update configuration with the SCMs that are actually added / removed
733+
// If there is partial reconfiguration (e.g. one successful add and one failed add),
734+
// we want to be able to retry on the failed node reconfiguration.
735+
// If we don't handle this, the subsequent reconfiguration will not work since the node
736+
// configuration is already exists / removed.
737+
Set<String> effectiveScmNodeIds = new HashSet<>(previousNodeIds);
738+
739+
LOG.info("Reconfiguring SCM nodes for service ID {} with new SCM nodes {} and remove SCM nodes {}",
740+
scmServiceId, scmNodesIdsToAdd, scmNodesIdsToRemove);
741+
742+
Collection<Pair<String, InetSocketAddress>> scmToAdd = HddsUtils.getSCMAddressForDatanodes(
743+
getConf(), scmServiceId, scmNodesIdsToAdd);
744+
if (scmToAdd == null) {
745+
throw new IllegalStateException("Reconfiguration failed to get SCM address to add due to wrong configuration");
746+
}
747+
Collection<Pair<String, InetSocketAddress>> scmToRemove = HddsUtils.getSCMAddressForDatanodes(
748+
getConf(), scmServiceId, scmNodesIdsToRemove);
749+
if (scmToRemove == null) {
750+
throw new IllegalArgumentException(
751+
"Reconfiguration failed to get SCM address to remove due to wrong configuration");
752+
}
753+
754+
StateContext context = datanodeStateMachine.getContext();
755+
SCMConnectionManager connectionManager = datanodeStateMachine.getConnectionManager();
756+
757+
// Assert that the datanode is in RUNNING state since
758+
// 1. If the datanode state is INIT, there might be concurrent connection manager operations
759+
// that might cause unpredictable behaviors
760+
// 2. If the datanode state is SHUTDOWN, it means that datanode is shutting down and there is no need
761+
// to reconfigure the connections.
762+
if (!DatanodeStates.RUNNING.equals(context.getState())) {
763+
throw new IllegalStateException("Reconfiguration failed since the datanode the current state" +
764+
context.getState().toString() + " is not in RUNNING state");
765+
}
766+
767+
// Add the new SCM servers
768+
for (Pair<String, InetSocketAddress> pair : scmToAdd) {
769+
String scmNodeId = pair.getLeft();
770+
InetSocketAddress scmAddress = pair.getRight();
771+
if (scmAddress.isUnresolved()) {
772+
LOG.warn("Reconfiguration failed to add SCM address {} for SCM service {} since it can't " +
773+
"be resolved, skipping", scmAddress, scmServiceId);
774+
continue;
775+
}
776+
try {
777+
connectionManager.addSCMServer(scmAddress, context.getThreadNamePrefix());
778+
context.addEndpoint(scmAddress);
779+
effectiveScmNodeIds.add(scmNodeId);
780+
LOG.info("Reconfiguration successfully add SCM address {} for SCM service {}", scmAddress, scmServiceId);
781+
} catch (IOException e) {
782+
LOG.error("Reconfiguration failed to add SCM address {} for SCM service {}", scmAddress, scmServiceId, e);
783+
}
784+
}
785+
786+
// Remove the old SCM server
787+
for (Pair<String, InetSocketAddress> pair : scmToRemove) {
788+
String scmNodeId = pair.getLeft();
789+
InetSocketAddress scmAddress = pair.getRight();
790+
try {
791+
connectionManager.removeSCMServer(scmAddress);
792+
context.removeEndpoint(scmAddress);
793+
effectiveScmNodeIds.remove(scmNodeId);
794+
LOG.info("Reconfiguration successfully remove SCM address {} for SCM service {}",
795+
scmAddress, scmServiceId);
796+
} catch (IOException e) {
797+
LOG.error("Reconfiguration failed to remove SCM address {} for SCM service {}", scmAddress, scmServiceId, e);
798+
}
799+
}
800+
801+
// Resize the executor pool size to (number of SCMs + 1 Recon)
802+
// Refer to DatanodeStateMachine#getEndPointTaskThreadPoolSize
803+
datanodeStateMachine.resizeExecutor(connectionManager.getNumOfConnections());
804+
805+
// TODO: In the future, we might also do some assertions on the SCM
806+
// - The SCM cannot be a leader since this causes the datanode to disappear
807+
// - The SCM should be decommissioned
808+
return String.join(",", effectiveScmNodeIds);
809+
}
810+
683811
/**
684812
* Returns the initial version of the datanode.
685813
*/

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.hadoop.metrics2.lib.Interns.info;
2121

22+
import com.google.common.annotations.VisibleForTesting;
2223
import com.google.common.base.CaseFormat;
2324
import java.net.InetSocketAddress;
2425
import java.util.HashMap;
@@ -171,6 +172,27 @@ public void addEndpoint(InetSocketAddress endpoint) {
171172
.to(CaseFormat.UPPER_CAMEL, k.getHostName())));
172173
}
173174

175+
public void removeEndpoint(InetSocketAddress endpoint) {
176+
incrementalReportsQueueMap.remove(endpoint);
177+
containerActionQueueMap.remove(endpoint);
178+
pipelineActionQueueMap.remove(endpoint);
179+
}
180+
181+
@VisibleForTesting
182+
public int getIncrementalReportsQueueMapSize() {
183+
return incrementalReportsQueueMap.size();
184+
}
185+
186+
@VisibleForTesting
187+
public int getContainerActionQueueMapSize() {
188+
return containerActionQueueMap.size();
189+
}
190+
191+
@VisibleForTesting
192+
public int getPipelineActionQueueMapSize() {
193+
return pipelineActionQueueMap.size();
194+
}
195+
174196
private MetricsInfo getMetricsInfo(String prefix, String metricName) {
175197
String metric = prefix + WordUtils.capitalize(metricName) + "Size";
176198
String description = "Queue size of " + metricName + " from " + prefix;

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
4646
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
4747
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
48+
import org.apache.hadoop.hdds.utils.HddsServerUtil;
4849
import org.apache.hadoop.hdds.utils.IOUtils;
4950
import org.apache.hadoop.hdds.utils.NettyMetrics;
5051
import org.apache.hadoop.hdfs.util.EnumCounters;
@@ -795,4 +796,19 @@ public VolumeChoosingPolicy getVolumeChoosingPolicy() {
795796
public void setNextHB(long time) {
796797
nextHB.set(time);
797798
}
799+
800+
@VisibleForTesting
801+
public ExecutorService getExecutorService() {
802+
return executorService;
803+
}
804+
805+
/**
806+
* Resize the executor based on the number of active endpoint tasks.
807+
*/
808+
public void resizeExecutor(int size) {
809+
if (executorService instanceof ThreadPoolExecutor) {
810+
ThreadPoolExecutor tpe = (ThreadPoolExecutor) executorService;
811+
HddsServerUtil.setPoolSize(tpe, size, LOG);
812+
}
813+
}
798814
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.ipc_.RPC;
4545
import org.apache.hadoop.metrics2.util.MBeans;
4646
import org.apache.hadoop.net.NetUtils;
47+
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates;
4748
import org.apache.hadoop.ozone.protocolPB.ReconDatanodeProtocolPB;
4849
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
4950
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
@@ -227,15 +228,14 @@ public void addReconServer(InetSocketAddress address,
227228
public void removeSCMServer(InetSocketAddress address) throws IOException {
228229
writeLock();
229230
try {
230-
if (!scmMachines.containsKey(address)) {
231+
EndpointStateMachine endPoint = scmMachines.remove(address);
232+
if (endPoint == null) {
231233
LOG.warn("Trying to remove a non-existent SCM machine. " +
232234
"Ignoring the request.");
233235
return;
234236
}
235-
236-
EndpointStateMachine endPoint = scmMachines.get(address);
237+
endPoint.setState(EndPointStates.SHUTDOWN);
237238
endPoint.close();
238-
scmMachines.remove(address);
239239
} finally {
240240
writeUnlock();
241241
}
@@ -274,4 +274,16 @@ public List<EndpointStateMachineMBean> getSCMServers() {
274274
readUnlock();
275275
}
276276
}
277+
278+
/**
279+
* @return the number of connections (both SCM and Recon)
280+
*/
281+
public int getNumOfConnections() {
282+
readLock();
283+
try {
284+
return scmMachines.size();
285+
} finally {
286+
readUnlock();
287+
}
288+
}
277289
}

0 commit comments

Comments
 (0)