Skip to content

Latest commit

 

History

History
288 lines (216 loc) · 8.43 KB

webflux-websocket.adoc

File metadata and controls

288 lines (216 loc) · 8.43 KB

WebSockets

This part of the reference documentation covers support for Reactive stack, WebSocket messaging.

WebSocket API

The Spring Framework provides a WebSocket API that can be used to write client and server side applications that handle WebSocket messages.

Server

To create a WebSocket server, first create a WebSocketHandler:

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}

Then map it to a URL and add a WebSocketHandlerAdapter:

@Configuration
static class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());

		SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
		mapping.setUrlMap(map);
		mapping.setOrder(-1); // before annotated controllers
		return mapping;
	}

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}

WebSocketHandler

The handle method of WebSocketHandler takes WebSocketSession and returns Mono<Void> to indicate when application handling of the session is complete. The session is handled through two streams, one for inbound and one for outbound messages:

WebSocketSession method Description

Flux<WebSocketMessage> receive()

Provides access to the inbound message stream, and completes when the connection is closed.

Mono<Void> send(Publisher<WebSocketMessage>)

Takes a source for outgoing messages, writes the messages, and returns a Mono<Void> that completes when the source completes and writing is done.

A WebSocketHandler must compose the inbound and outbound streams into a unified flow, and return a Mono<Void> that reflects the completion of that flow. Depending on application requirements, the unified flow completes when:

  • Either inbound or outbound message streams complete.

  • Inbound stream completes (i.e. connection closed), while outbound is infinite.

  • At a chosen point through the close method of WebSocketSession.

When inbound and outbound message streams are composed together, there is no need to check if the connection is open, since Reactive Streams signals will terminate activity. The inbound stream receives a completion/error signal, and the outbound stream receives receives a cancellation signal.

The most basic implementation of a handler is one that handles the inbound stream:

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
  1. Access stream of inbound messages.

  2. Do something with each message.

  3. Perform nested async operation using message content.

  4. Return Mono<Void> that completes when receiving completes.

Tip

For nested, asynchronous operations, you may need to call message.retain() on underlying servers that use pooled data buffers (e.g. Netty), or otherwise the data buffer may be released before you’ve had a chance to read the data. For more background see Data Buffers and Codecs.

The below implementation combines the inbound with the outbound streams:

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
  1. Handle inbound message stream.

  2. Create outbound message, producing a combined flow.

  3. Return Mono<Void> that doesn’t complete while we continue to receive.

Inbound and outbound streams can be independent, and joined only for completion:

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	(2)

		return Mono.zip(input, output).then();								(3)
	}
}
  1. Handle inbound message stream.

  2. Send outgoing messages.

  3. Join the streams and return Mono<Void> that completes when either stream ends.

Handshake

WebSocketHandlerAdapter delegates to a WebSocketService. By default that’s an instance of HandshakeWebSocketService, which performs basic checks on the WebSocket request and then uses RequestUpgradeStrategy for the server in use. Currently there is built-in support for Reactor Netty, Tomcat, Jetty, and Undertow.

HandshakeWebSocketService exposes a sessionAttributePredicate property that allows setting a Predicate<String> to extract attributes from the WebSession and insert them into the attributes of the WebSocketSession.

Server config

The RequestUpgradeStrategy for each server exposes WebSocket-related configuration options available for the underlying WebSocket engine. Below is an example of setting WebSocket options when running on Tomcat:

@Configuration
static class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}

Check the upgrade strategy for your server to see what options are available. Currently only Tomcat and Jetty expose such options.

CORS

The easiest way to configure CORS and restrict access to a WebSocket endpoint is to have your WebSocketHandler implement CorsConfigurationSource and return a CorsConfiguraiton with allowed origins, headers, etc. If for any reason you can’t do that, you can also set the corsConfigurations property on the SimpleUrlHandler to specify CORS settings by URL pattern. If both are specified they’re combined via the combine method on CorsConfiguration.

Client

Spring WebFlux provides a WebSocketClient abstraction with implementations for Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (i.e. JSR-356).

Note

The Tomcat client is effectively an extension of the standard Java one with some extra functionality in the WebSocketSession handling taking advantage of Tomcat specific API to suspend receiving messages for back pressure.

To start a WebSocket session, create an instance of the client and use its execute methods:

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());

Some clients, e.g. Jetty, implement Lifecycle and need to be started in stopped before you can use them. All clients have constructor options related to configuration of the underlying WebSocket client.