1616
1717package org .springframework .web .reactive .socket ;
1818
19+ import java .nio .charset .StandardCharsets ;
1920import java .time .Duration ;
21+ import java .util .Arrays ;
2022import java .util .Collections ;
2123import java .util .HashMap ;
2224import java .util .List ;
2729import org .apache .commons .logging .LogFactory ;
2830import reactor .core .publisher .Flux ;
2931import reactor .core .publisher .Mono ;
32+ import reactor .netty .http .client .WebsocketClientSpec ;
3033import reactor .util .retry .Retry ;
3134
3235import org .springframework .context .annotation .Bean ;
3336import org .springframework .context .annotation .Configuration ;
37+ import org .springframework .core .io .buffer .DataBuffer ;
3438import org .springframework .http .HttpHeaders ;
3539import org .springframework .http .ResponseCookie ;
3640import org .springframework .web .filter .reactive .ServerWebExchangeContextFilter ;
3741import org .springframework .web .reactive .HandlerMapping ;
3842import org .springframework .web .reactive .handler .SimpleUrlHandlerMapping ;
43+ import org .springframework .web .reactive .socket .adapter .NettyWebSocketSessionSupport ;
44+ import org .springframework .web .reactive .socket .client .ReactorNettyWebSocketClient ;
3945import org .springframework .web .reactive .socket .client .WebSocketClient ;
4046import org .springframework .web .server .WebFilter ;
4147import org .springframework .web .testfixture .http .server .reactive .bootstrap .HttpServer ;
4248import org .springframework .web .testfixture .http .server .reactive .bootstrap .TomcatHttpServer ;
4349
4450import static org .assertj .core .api .Assertions .assertThat ;
51+ import static org .assertj .core .api .Assertions .assertThatCode ;
52+ import static org .junit .jupiter .api .Assumptions .assumeTrue ;
4553
4654/**
4755 * Integration tests with server-side {@link WebSocketHandler}s.
@@ -186,6 +194,35 @@ void cookie(WebSocketClient client, HttpServer server, Class<?> serverConfigClas
186194 assertThat (cookie .get ()).isEqualTo ("project=spring" );
187195 }
188196
197+ @ ParameterizedWebSocketTest
198+ void largePayload (WebSocketClient client , HttpServer server , Class <?> serverConfigClass ) throws Exception {
199+
200+ assumeTrue (client instanceof ReactorNettyWebSocketClient );
201+
202+ int defaultFrameMaxSize = NettyWebSocketSessionSupport .DEFAULT_FRAME_MAX_SIZE ;
203+ int extendedLimit = 2 * defaultFrameMaxSize ;
204+
205+ ReactorNettyWebSocketClient nettyClient = (ReactorNettyWebSocketClient ) client ;
206+ ReactorNettyWebSocketClient reconfiguredClient = new ReactorNettyWebSocketClient (
207+ nettyClient .getHttpClient (),
208+ () -> WebsocketClientSpec .builder ().maxFramePayloadLength (extendedLimit ));
209+
210+ startServer (reconfiguredClient , server , serverConfigClass );
211+
212+ AtomicReference <Integer > payloadSizeRef = new AtomicReference <>();
213+ assertThatCode (() -> reconfiguredClient .execute (getUrl ("/large-payload" ),
214+ session -> session .receive ()
215+ .map (WebSocketMessage ::getPayload )
216+ .map (DataBuffer ::readableByteCount )
217+ .reduce (Integer ::sum )
218+ .doOnNext (payloadSizeRef ::set )
219+ .then ())
220+ .block (TIMEOUT ))
221+ .doesNotThrowAnyException ();
222+
223+ assertThat (payloadSizeRef .get ()).isGreaterThan (defaultFrameMaxSize );
224+ assertThat (payloadSizeRef .get ()).isEqualTo (extendedLimit );
225+ }
189226
190227 @ Configuration
191228 static class WebConfig {
@@ -198,6 +235,7 @@ public HandlerMapping handlerMapping() {
198235 map .put ("/custom-header" , new CustomHeaderHandler ());
199236 map .put ("/close" , new SessionClosingHandler ());
200237 map .put ("/cookie" , new CookieHandler ());
238+ map .put ("/large-payload" , new LargePayloadHandler ());
201239 return new SimpleUrlHandlerMapping (map );
202240 }
203241
@@ -274,4 +312,15 @@ public Mono<Void> handle(WebSocketSession session) {
274312 }
275313 }
276314
315+ private static class LargePayloadHandler implements WebSocketHandler {
316+
317+ @ Override
318+ public Mono <Void > handle (WebSocketSession session ) {
319+ int doubledFrameSize = 2 * NettyWebSocketSessionSupport .DEFAULT_FRAME_MAX_SIZE ;
320+ byte [] payload = new byte [doubledFrameSize ];
321+ Arrays .fill (payload , (byte ) 'x' );
322+ String text = new String (payload , StandardCharsets .UTF_8 );
323+ return session .send (Mono .just (session .textMessage (text )));
324+ }
325+ }
277326}
0 commit comments