Skip to content

Commit 3990530

Browse files
authored
HDDS-14426. OMResponse.leaderOMNodeId should not use RaftClientMessage.serverId (#9682)
1 parent 9f4fb5e commit 3990530

7 files changed

Lines changed: 67 additions & 113 deletions

File tree

hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,11 @@ public synchronized String getCurrentProxyOMNodeId() {
176176
return omProxies.getNodeId(currentProxyIndex);
177177
}
178178

179+
@VisibleForTesting
180+
public synchronized String getNextProxyOMNodeId() {
181+
return omProxies.getNodeId(nextProxyIndex);
182+
}
183+
179184
@VisibleForTesting
180185
public RetryPolicy getRetryPolicy(int maxFailovers) {
181186
// Client will attempt up to maxFailovers number of failovers between

hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
2727
import org.apache.ratis.protocol.Message;
2828
import org.apache.ratis.protocol.RaftClientReply;
29+
import org.apache.ratis.protocol.RaftPeerId;
2930
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
3031
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
3132
import org.slf4j.Logger;
@@ -64,13 +65,14 @@ public static OMResponse convertByteStringToOMResponse(ByteString bytes) throws
6465
}
6566

6667
/** Convert the given reply with proto 3 {@link ByteString} to a proto 2 response. */
67-
public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply) throws IOException {
68+
public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply, RaftPeerId leaderOMNodeId)
69+
throws IOException {
6870
final OMResponse response = convertByteStringToOMResponse(reply.getMessage().getContent());
69-
if (reply.getReplierId().equals(response.getLeaderOMNodeId())) {
71+
if (leaderOMNodeId == null) {
7072
return response;
7173
}
7274
return OMResponse.newBuilder(response)
73-
.setLeaderOMNodeId(reply.getReplierId())
75+
.setLeaderOMNodeId(leaderOMNodeId.toString())
7476
.build();
7577
}
7678

hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,18 +90,7 @@ public Hadoop3OmTransport(ConfigurationSource conf,
9090
@Override
9191
public OMResponse submitRequest(OMRequest payload) throws IOException {
9292
try {
93-
OMResponse omResponse =
94-
rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
95-
96-
if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
97-
String leaderOmId = omResponse.getLeaderOMNodeId();
98-
99-
// Failover to the OM node returned by OMResponse leaderOMNodeId if
100-
// current proxy is not pointing to that node.
101-
omFailoverProxyProvider.setNextOmProxy(leaderOmId);
102-
omFailoverProxyProvider.performFailover(null);
103-
}
104-
return omResponse;
93+
return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
10594
} catch (ServiceException e) {
10695
OMNotLeaderException notLeaderException =
10796
HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java

Lines changed: 54 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import static org.assertj.core.api.Assertions.assertThat;
2727
import static org.junit.jupiter.api.Assertions.assertEquals;
2828
import static org.junit.jupiter.api.Assertions.assertFalse;
29-
import static org.junit.jupiter.api.Assertions.assertNotEquals;
3029
import static org.junit.jupiter.api.Assertions.assertNotNull;
3130
import static org.junit.jupiter.api.Assertions.assertNotSame;
3231
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -57,10 +56,15 @@
5756
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
5857
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
5958
import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
59+
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
60+
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
6061
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
6162
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
6263
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
6364
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest;
65+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest;
66+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest.Scope;
67+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
6468
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
6569
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
6670
import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
@@ -122,50 +126,6 @@ void testFollowerReadTargetsFollower() throws Exception {
122126
assertEquals(followerOMNodeId, lastProxy.getNodeId());
123127
}
124128

125-
@Test
126-
public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
127-
ObjectStore objectStore = getObjectStore();
128-
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> omFailoverProxyProvider =
129-
OmTestUtil.getFailoverProxyProvider(objectStore);
130-
HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> followerReadFailoverProxyProvider =
131-
OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
132-
String initialFollowerReadNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
133-
134-
// Run couple of createVolume tests to discover the current Leader OM
135-
createVolumeTest(true);
136-
createVolumeTest(true);
137-
138-
// The oMFailoverProxyProvider will point to the current leader OM node.
139-
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
140-
141-
// Perform a manual failover of the proxy provider to move the
142-
// currentProxyIndex to a node other than the leader OM.
143-
omFailoverProxyProvider.selectNextOmProxy();
144-
omFailoverProxyProvider.performFailover(null);
145-
146-
String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
147-
assertNotEquals(leaderOMNodeId, newProxyNodeId);
148-
149-
// Once another request is sent to this new proxy node, the leader
150-
// information must be returned via the response and a failover must
151-
// happen to the leader proxy node.
152-
// This will also do some read operations where this might read from the follower.
153-
createVolumeTest(true);
154-
Thread.sleep(2000);
155-
156-
String newLeaderOMNodeId =
157-
omFailoverProxyProvider.getCurrentProxyOMNodeId();
158-
159-
// The old and new Leader OM NodeId must match since there was no new
160-
// election in the Ratis ring.
161-
assertEquals(leaderOMNodeId, newLeaderOMNodeId);
162-
163-
// The follower read proxy should remain unchanged since the follower is not throwing exceptions
164-
// The performFailover on the leader proxy should not affect the follower read proxy provider
165-
String currentFollowerReadNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
166-
assertEquals(initialFollowerReadNodeId, currentFollowerReadNodeId);
167-
}
168-
169129
/**
170130
* Choose a follower to send the request, the returned exception should
171131
* include the suggested leader node.
@@ -461,4 +421,53 @@ public void testAllBucketOperations() throws Exception {
461421
OzoneTestUtils.expectOmException(OMException.ResultCodes.BUCKET_NOT_FOUND,
462422
() -> retVolume.deleteBucket(bucketName));
463423
}
424+
425+
@Test
426+
void testOMResponseLeaderOmNodeId() throws Exception {
427+
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> omFailoverProxyProvider =
428+
OmTestUtil.getFailoverProxyProvider(getObjectStore());
429+
HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> followerReadFailoverProxyProvider =
430+
OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
431+
432+
// Make sure All OMs are ready
433+
createVolumeTest(true);
434+
435+
// The OMFailoverProxyProvider will point to the current leader OM node.
436+
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
437+
String initialNextProxyOmNodeId = omFailoverProxyProvider.getNextProxyOMNodeId();
438+
OzoneManager followerOM = null;
439+
for (OzoneManager om: getCluster().getOzoneManagersList()) {
440+
if (!om.isLeaderReady()) {
441+
followerOM = om;
442+
break;
443+
}
444+
}
445+
assertNotNull(followerOM);
446+
assertSame(OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER,
447+
followerOM.getOmRatisServer().getLeaderStatus());
448+
449+
450+
ListVolumeRequest req =
451+
ListVolumeRequest.newBuilder()
452+
.setScope(Scope.VOLUMES_BY_USER)
453+
.build();
454+
455+
OzoneManagerProtocolProtos.OMRequest readRequest =
456+
OzoneManagerProtocolProtos.OMRequest.newBuilder()
457+
.setCmdType(Type.ListVolume)
458+
.setListVolumeRequest(req)
459+
.setVersion(ClientVersion.CURRENT_VERSION)
460+
.setClientId(randomUUID().toString())
461+
.build();
462+
463+
OmTransport omTransport = ((OzoneManagerProtocolClientSideTranslatorPB)
464+
getObjectStore().getClientProxy().getOzoneManagerClient()).getTransport();
465+
followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOM.getOMNodeId());
466+
OMResponse omResponse = omTransport.submitRequest(readRequest);
467+
468+
// The returned OM response should be the same as the actual leader OM node ID
469+
assertEquals(leaderOMNodeId, omResponse.getLeaderOMNodeId());
470+
// There should not be any change in the leader proxy's next proxy OM node ID
471+
assertEquals(initialNextProxyOmNodeId, omFailoverProxyProvider.getNextProxyOMNodeId());
472+
}
464473
}

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import static org.assertj.core.api.Assertions.assertThat;
3434
import static org.junit.jupiter.api.Assertions.assertEquals;
3535
import static org.junit.jupiter.api.Assertions.assertFalse;
36-
import static org.junit.jupiter.api.Assertions.assertNotEquals;
3736
import static org.junit.jupiter.api.Assertions.assertNotNull;
3837
import static org.junit.jupiter.api.Assertions.assertSame;
3938
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -311,45 +310,6 @@ void testOMProxyProviderInitialization() {
311310
}
312311
}
313312

314-
/**
315-
* Test HadoopRpcOMFailoverProxyProvider failover when current OM proxy is not
316-
* the current OM Leader.
317-
*/
318-
@Test
319-
public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
320-
ObjectStore objectStore = getObjectStore();
321-
final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> omFailoverProxyProvider
322-
= OmTestUtil.getFailoverProxyProvider(objectStore);
323-
324-
// Run couple of createVolume tests to discover the current Leader OM
325-
createVolumeTest(true);
326-
createVolumeTest(true);
327-
328-
// The oMFailoverProxyProvider will point to the current leader OM node.
329-
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
330-
331-
// Perform a manual failover of the proxy provider to move the
332-
// currentProxyIndex to a node other than the leader OM.
333-
omFailoverProxyProvider.selectNextOmProxy();
334-
omFailoverProxyProvider.performFailover(null);
335-
336-
String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
337-
assertNotEquals(leaderOMNodeId, newProxyNodeId);
338-
339-
// Once another request is sent to this new proxy node, the leader
340-
// information must be returned via the response and a failover must
341-
// happen to the leader proxy node.
342-
createVolumeTest(true);
343-
Thread.sleep(2000);
344-
345-
String newLeaderOMNodeId =
346-
omFailoverProxyProvider.getCurrentProxyOMNodeId();
347-
348-
// The old and new Leader OM NodeId must match since there was no new
349-
// election in the Ratis ring.
350-
assertEquals(leaderOMNodeId, newLeaderOMNodeId);
351-
}
352-
353313
/**
354314
* Choose a follower to send the request, the returned exception should
355315
* include the suggested leader node.

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ private OMResponse createOmResponseImpl(OMRequest omRequest,
617617

618618
private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException {
619619
try {
620-
return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
620+
return OMRatisHelper.getOMResponseFromRaftClientReply(reply, getLeaderId());
621621
} catch (IOException ex) {
622622
if (ex.getMessage() != null) {
623623
throw new ServiceException(ex.getMessage(), ex);

hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,7 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
8585
@Override
8686
public OMResponse submitRequest(OMRequest payload) throws IOException {
8787
try {
88-
OMResponse omResponse =
89-
rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
90-
91-
if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
92-
String leaderOmId = omResponse.getLeaderOMNodeId();
93-
94-
// Failover to the OM node returned by OMResponse leaderOMNodeId if
95-
// current proxy is not pointing to that node.
96-
omFailoverProxyProvider.setNextOmProxy(leaderOmId);
97-
omFailoverProxyProvider.performFailover(null);
98-
}
99-
return omResponse;
88+
return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
10089
} catch (ServiceException e) {
10190
OMNotLeaderException notLeaderException =
10291
HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);

0 commit comments

Comments
 (0)