1515import io .netty .channel .socket .nio .NioSocketChannel ;
1616import io .netty .handler .codec .http .*;
1717import io .netty .handler .codec .http .websocketx .*;
18+ import io .netty .handler .codec .http .websocketx .extensions .compression .WebSocketClientCompressionHandler ;
19+ import io .netty .handler .codec .http .websocketx .extensions .compression .WebSocketServerCompressionHandler ;
1820import io .netty .util .ReferenceCountUtil ;
1921import org .reactivestreams .Processor ;
2022import org .testng .annotations .AfterClass ;
@@ -41,8 +43,12 @@ public class WebSocketsTest {
4143 private BlockingQueue <Object > clientEvents = new LinkedBlockingQueue <>();
4244 private int port ;
4345
44- @ Test
45- public void simpleWebSocket () throws Exception {
46+ /**
47+ * Note: withCompression and withoutExtensions will not work as compression requires Extensions.
48+ * @param withCompression Enable Compression for this test
49+ * @param withExtensions Enable WebSocket Extensions on the handshaker
50+ */
51+ private void simpleWebSocket (final boolean withCompression , final boolean withExtensions ) throws Exception {
4652 start (new AutoReadHandler () {
4753 @ Override
4854 public void channelRead (ChannelHandlerContext ctx , Object msg ) throws Exception {
@@ -68,13 +74,13 @@ public WebSocketFrame apply(WebSocketFrame msg) throws Exception {
6874
6975 ctx .writeAndFlush (new DefaultWebSocketHttpResponse (request .protocolVersion (),
7076 HttpResponseStatus .valueOf (200 ), processor ,
71- new WebSocketServerHandshakerFactory ("ws://127.0.0.1/" + port + "/" , null , false )
77+ new WebSocketServerHandshakerFactory ("ws://127.0.0.1/" + port + "/" , null , withExtensions )
7278 ));
7379 }
7480 }
75- });
81+ }, withCompression );
7682
77- makeWebSocketRequest ();
83+ makeWebSocketRequest (withCompression , withExtensions );
7884 assertNoMessages ();
7985 client .writeAndFlush (new TextWebSocketFrame ("hello" ));
8086 assertEquals (readTextFrame (), "echo hello" );
@@ -106,6 +112,21 @@ public WebSocketFrame apply(WebSocketFrame msg) throws Exception {
106112 assertNoMessages ();
107113 }
108114
115+ @ Test
116+ public void simpleWebSocketWithCompressionAndExtensions () throws Exception {
117+ simpleWebSocket (true , true );
118+ }
119+
120+ @ Test
121+ public void simpleWebSocketWithoutCompressionWithoutExtensions () throws Exception {
122+ simpleWebSocket (false , false );
123+ }
124+
125+ @ Test
126+ public void simpleWebSocketWithoutCompressionWithExtensions () throws Exception {
127+ simpleWebSocket (false , true );
128+ }
129+
109130 @ Test
110131 public void rejectWebSocket () throws Exception {
111132 start (new AutoReadHandler () {
@@ -166,6 +187,10 @@ public void closeChannels() throws InterruptedException {
166187 }
167188
168189 private void start (final ChannelHandler handler ) throws InterruptedException {
190+ start (handler , false );
191+ }
192+
193+ private void start (final ChannelHandler handler , final boolean enableCompression ) throws InterruptedException {
169194 ServerBootstrap bootstrap = new ServerBootstrap ();
170195 bootstrap .group (eventLoop )
171196 .channel (NioServerSocketChannel .class )
@@ -179,8 +204,14 @@ protected void initChannel(SocketChannel ch) throws Exception {
179204 pipeline .addLast (
180205 new HttpRequestDecoder (),
181206 new HttpResponseEncoder ()
182- ).addLast ("serverStreamsHandler" , new HttpStreamsServerHandler ())
183- .addLast (handler );
207+ );
208+
209+ if (enableCompression ) {
210+ pipeline .addLast (new WebSocketServerCompressionHandler ());
211+ }
212+ pipeline
213+ .addLast ("serverStreamsHandler" , new HttpStreamsServerHandler ())
214+ .addLast (handler );
184215 }
185216 });
186217
@@ -197,8 +228,11 @@ protected void initChannel(SocketChannel ch) throws Exception {
197228 protected void initChannel (SocketChannel ch ) throws Exception {
198229 final ChannelPipeline pipeline = ch .pipeline ();
199230
200- pipeline .addLast (new HttpClientCodec (), new HttpObjectAggregator (8192 ))
201- .addLast (new AutoReadHandler () {
231+ pipeline .addLast (new HttpClientCodec (), new HttpObjectAggregator (8192 ));
232+
233+ if (enableCompression ) pipeline .addLast (WebSocketClientCompressionHandler .INSTANCE );
234+
235+ pipeline .addLast (new AutoReadHandler () {
202236 // Store a reference to the current client events
203237 BlockingQueue <Object > events = clientEvents ;
204238 public void channelRead (ChannelHandlerContext ctx , Object msg ) throws Exception {
@@ -211,12 +245,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
211245 this .client = client .remoteAddress (serverBindChannel .localAddress ()).connect ().await ().channel ();
212246 }
213247
214- private void makeWebSocketRequest () throws InterruptedException {
248+ private void makeWebSocketRequest (final boolean withCompression , final boolean withExtensions ) throws InterruptedException {
215249 WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory .newHandshaker (
216250 URI .create ("ws://127.0.0.1:" + port + "/" ),
217- WebSocketVersion .V13 , null , false , new DefaultHttpHeaders ());
251+ WebSocketVersion .V13 , null , withExtensions , new DefaultHttpHeaders ());
218252 handshaker .handshake (client );
219253 FullHttpResponse response = receiveFullResponse ();
254+ HttpHeaders headers = response .headers ();
255+ if (withCompression ) {
256+ assertTrue (headers .contains ("sec-websocket-extensions" ));
257+ assertEquals (headers .get ("sec-websocket-extensions" ), "permessage-deflate" );
258+ } else {
259+ assertTrue (!headers .contains ("sec-websocket-extensions" ) ||
260+ !headers .get ("sec-websocket-extensions" ).contains ("permessage-deflate" ));
261+ }
220262 handshaker .finishHandshake (client , response );
221263 }
222264
0 commit comments