Skip to content

Commit ef0c21e

Browse files
committed
C++ MCS connection scaling interop test client side changes
1 parent eca34ab commit ef0c21e

3 files changed

Lines changed: 92 additions & 4 deletions

File tree

test/cpp/interop/client.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,14 @@ int main(int argc, char** argv) {
228228
factories.emplace_back(
229229
new grpc::testing::MetadataAndStatusLoggerInterceptorFactory());
230230
}
231-
std::string service_config_json =
232-
absl::GetFlag(FLAGS_service_config_json);
233-
if (!service_config_json.empty()) {
234-
arguments.SetServiceConfigJSON(service_config_json);
231+
if (test_case == "mcs_cs") {
232+
arguments.SetServiceConfigJSON("{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}");
233+
} else {
234+
std::string service_config_json =
235+
absl::GetFlag(FLAGS_service_config_json);
236+
if (!service_config_json.empty()) {
237+
arguments.SetServiceConfigJSON(service_config_json);
238+
}
235239
}
236240
return CreateChannelForTestCase(test_case, std::move(factories),
237241
arguments);
@@ -279,6 +283,9 @@ int main(int argc, char** argv) {
279283
std::bind(&grpc::testing::InteropClient::DoOrcaPerRpc, &client);
280284
actions["orca_oob"] =
281285
std::bind(&grpc::testing::InteropClient::DoOrcaOob, &client);
286+
actions["mcs_cs"] =
287+
std::bind(&grpc::testing::InteropClient::DoMcsConnectionScaling, &client);
288+
282289
if (absl::GetFlag(FLAGS_use_tls)) {
283290
actions["compute_engine_creds"] =
284291
std::bind(&grpc::testing::InteropClient::DoComputeEngineCreds, &client,

test/cpp/interop/interop_client.cc

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,6 +1361,86 @@ bool InteropClient::DoLongLivedChannelTest(int32_t soak_iterations,
13611361
}
13621362
}
13631363

1364+
bool InteropClient::DoMcsConnectionScaling() {
1365+
VLOG(2) << "Sending Mcs connection scaling streaming rpc1 ...";
1366+
1367+
ClientContext context1;
1368+
std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1369+
StreamingOutputCallResponse>>
1370+
stream1(serviceStub_.Get()->FullDuplexCall(&context1));
1371+
1372+
StreamingOutputCallRequest request;
1373+
Payload* payload = request.mutable_payload();
1374+
payload->set_body("max concurrent streaming connection scaling");
1375+
StreamingOutputCallResponse response1;
1376+
1377+
if (!stream1->Write(request)) {
1378+
LOG(ERROR) << "DoMcsConnectionScaling(): stream1->Write() failed.";
1379+
return TransientFailureOrAbort();
1380+
}
1381+
std::string clientSocketAddressInCall1 = response1.payload().body();
1382+
1383+
VLOG(2) << "Sending Mcs connection scaling streaming rpc2 ...";
1384+
1385+
ClientContext context2;
1386+
std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1387+
StreamingOutputCallResponse>>
1388+
stream2(serviceStub_.Get()->FullDuplexCall(&context2));
1389+
1390+
StreamingOutputCallResponse response2;
1391+
1392+
if (!stream2->Write(request)) {
1393+
LOG(ERROR) << "DoMcsConnectionScaling(): stream2->Write() failed.";
1394+
return TransientFailureOrAbort();
1395+
}
1396+
std::string clientSocketAddressInCall2 = response2.payload().body();
1397+
1398+
// The same connection should have been used for both streams.
1399+
GRPC_CHECK(response1.payload().body() == response2.payload().body());
1400+
1401+
VLOG(2) << "Sending Mcs connection scaling streaming rpc3 ...";
1402+
1403+
ClientContext context3;
1404+
std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1405+
StreamingOutputCallResponse>>
1406+
stream3(serviceStub_.Get()->FullDuplexCall(&context3));
1407+
1408+
StreamingOutputCallResponse response3;
1409+
1410+
if (!stream3->Write(request)) {
1411+
LOG(ERROR) << "DoMcsConnectionScaling(): stream3->Write() failed.";
1412+
return TransientFailureOrAbort();
1413+
}
1414+
std::string clientSocketAddressInCall3 = response3.payload().body();
1415+
1416+
// A new connection should have been used for the 3rd stream.
1417+
GRPC_CHECK(response1.payload().body() != response3.payload().body());
1418+
1419+
stream1->WritesDone();
1420+
GRPC_CHECK(!stream1->Read(&response1));
1421+
Status s = stream1->Finish();
1422+
if (!AssertStatusOk(s, context1.debug_error_string())) {
1423+
return false;
1424+
}
1425+
1426+
stream2->WritesDone();
1427+
GRPC_CHECK(!stream2->Read(&response2));
1428+
s = stream2->Finish();
1429+
if (!AssertStatusOk(s, context2.debug_error_string())) {
1430+
return false;
1431+
}
1432+
1433+
stream3->WritesDone();
1434+
GRPC_CHECK(!stream3->Read(&response3));
1435+
s = stream3->Finish();
1436+
if (!AssertStatusOk(s, context3.debug_error_string())) {
1437+
return false;
1438+
}
1439+
1440+
VLOG(2) << "Mcs connection scaling done.";
1441+
return true;
1442+
}
1443+
13641444
bool InteropClient::DoUnimplementedService() {
13651445
VLOG(2) << "Sending a request for an unimplemented service...";
13661446

test/cpp/interop/interop_client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class InteropClient {
7878
bool DoPickFirstUnary();
7979
bool DoOrcaPerRpc();
8080
bool DoOrcaOob();
81+
bool DoMcsConnectionScaling();
8182

8283
// The following interop test are not yet part of the interop spec, and are
8384
// not implemented cross-language. They are considered experimental for now,

0 commit comments

Comments
 (0)