RSocket With Spring Boot + JS: Zero to Hero
A tutorial.
Join the DZone community and get the full member experience.
Join For FreeWhat’s RSocket?
RSocket is a binary, asynchronous, one-to-one, stateless, symmetrical protocol that relies on one between TCP, WebSocket, Aeron, and HTTP/2 streams to be used as a transport protocol behind the curtains.
It’s been developed to match the Reactive Streams semantics, therefore integrating seamlessly in applications that depend heavily on Reactive libraries such as Project Reactor or ReactiveX.
Why Should I Use RSocket?
BareHTTP just doesn’t cut it, especially in the modern world where software architecture leans heavily towards microservices.
Microservices need to communicate potentially with a myriad of other microservices, in a tangled and twisted dance that doesn’t always go along with the core principles upon which HTTP has been built: sending text over the wire, in a request ⇄ response fashion.
It is often required that microservices send out events in a fire-and-forget manner (brokers and advanced messaging protocols help with this but at the cost of adding significant complexity to infrastructure and applications relying on them), or request some data and hold onto the connection expecting a stream of data coming through as a response, over time.
HTTP is not an efficient solution in either of these scenarios, whereas a transport protocol that’s been built specifically for computers talking to other computers asynchronously and with high-performance in mind, such as WebSocket, seems to be a very good fit.
RSocket provides all the advantages of choosing the best transport protocol for the task, and builds things like Reactive Streams semantics, backpressure management, load-balancing hints and resumability on top of it! Great stuff!
Interaction Models
RSocket is based on four main interaction models that enable symmetric interaction over a single connection:
- Request/Response: similar to HTTP, but the client waits for the response (a stream of one element) in a non-blocking manner;
- Request/Stream: the client receives elements that compose a stream of many, over time;
- Fire-and-Forget: the client sends some data and expects no response;
- Channel: the most customizable interaction model, where the client and the server can exchange data in any way that seems fit for a specific task (eg. server sends 2 frames for each frame* sent by the client).
*a frame is a single message containing either a request or a response
Resilience and High Availability
RSocket satisfies resiliency and high availability requirements by providing features such as connection/request resumption and load-balancing hints through leasing.
Resumption is the ability to resume operation in case of failure (eg. recovering an abruptly closed connection).
When using RSocket, clients are completely in charge of resumption, and it is not enabled by default.
It is particularly useful as, when sending a RESUME frame containing information about the last received frame, the client is able to resume the connection and only request the data it hasn’t already received, avoiding unnecessary load on the server and wasting time trying to retrieve data that was already retrieved.
It should be used everywhere, where it makes sense to do so.
Leasing can be enabled so that servers issue leases to clients, making sure said clients don’t exceed the defined request rates (the client should not send more than Number of Requests in any particular Time-To-Live timeframe, or else its requests will be rejected until a new, valid lease can be issued).
Servers are completely in charge of leasing, and, as it is the case for resumption, it is not enabled by default either.
When having clusters of machines exposing the same API via RSocket, it would be wise to enable leasing and use the server-provided responses as load-balancing hints for smart request routing, targeting the machines that are more likely to issue a valid lease with higher frequency.
I Want to See It in Action!
Worry not! I’ve prepared a reactive demo application based on Spring Boot 2.2 that shows the request/stream interaction mode of RSocket in action, over WebSocket transport, with a Java RSocket Server, a Java RSocket client and a Javascript RSocket client.
I’ll explain all the steps required to create an app similar to the demo one further down, but, if you’re impatient, you can jump right into it.
Either dig into the source code yourself or follow the quickstart to see RSocket in action.
Quickstart
- Clone the repository: https://github.com/dsibilio/rsocket-demo.git, and move inside the cloned directory
- Deploy the Java RSocket Server and spin up both the Java RSocket client and the Javascript RSocket client by running:
mvn spring-boot:run
What Can I Do Now?
The Java backend exposes the following APIs*:
- HTTP http://localhost:8080/socket/{author}: the HTTP request triggers the Java RSocket client to pull data from the Java RSocket server
- HTTP http://localhost:8080/tweets/{author}: the same API as above but without any socket interaction, pure SSE over HTTP (for comparison)
- WS
ws://localhost:8080/tweetsocket
- route:tweets.by.author
: WebSocket transport employed by the Javascript RSocket client to pull data from the Java RSocket server
The Javascript client can be seen in action by going to http://localhost:8080/index.html; it connects to the Java RSocket server directly to pull data from it in a request/stream fashion.
How To: Build Your App With RSocket
This section will help you develop an application like the demo one, using Java to host an RSocket server, and both Java and Javascript to pull data from the said server via RSocket clients.
NOTE: both Spring Boot 2.2 and RSocket are products under development at the time of writing this article, therefore the following steps might slightly change in the future!
Java RSocket Client and Java RSocket Server
In order to develop a working Java RSocket Server and Client implementation, go through the following steps.
- Generate the backbone of your project by heading to https://start.spring.io/, selecting Spring Boot 2.2.0 M6 as your Spring Boot version, and adding Spring Reactive Web and RSocket as selected dependencies.
- Replace the src/main/resources/application.properties file with an application.yml like the following:
local: server: port: 8080 spring: rsocket: server: mapping-path: /tweetsocket transport: websocket
- Create an RSocketConfiguration class inside the com.example.demo.configpackage:
package com.example.demo.config; import reactor.core.publisher.Mono; import org.springframework.boot.autoconfigure.rsocket.RSocketProperties; import org.springframework.boot.web.server.LocalServerPort; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketStrategies; import java.net.URI; @Configuration public class RSocketConfiguration { @LocalServerPort private int port; @Bean public Mono<RSocketRequester> rSocketRequester( RSocketStrategies rSocketStrategies, RSocketProperties rSocketProps) { return RSocketRequester.builder() .rsocketStrategies(rSocketStrategies) .connectWebSocket(getURI(rSocketProps)); } private URI getURI(RSocketProperties rSocketProps) { return URI.create(String.format("ws://localhost:%d%s", port, rSocketProps.getServer().getMappingPath())); } }
- Create a simple Tweet POJO, like the one below, and put it inside the com.example.demo.domain package:
package com.example.demo.domain; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer; import io.netty.util.internal.ThreadLocalRandom; import java.time.LocalDate; import java.util.UUID; public class Tweet { private String id; private String author; private String body; @JsonDeserialize(using = LocalDateDeserializer.class) @JsonSerialize(using = LocalDateSerializer.class) private LocalDate date; public Tweet() {} public Tweet(String author, String body) { this.id = UUID.randomUUID().toString(); this.author = author; this.body = body; this.date = getRandomDate(); } public static Tweet of(Tweet tweet) { return new Tweet(tweet.getAuthor(), tweet.getBody()); } private LocalDate getRandomDate() { ThreadLocalRandom r = ThreadLocalRandom.current(); return LocalDate.of(r.nextInt(1990, 2020), r.nextInt(1, 13), r.nextInt(1, 29)); } public String getId() { return id; } public String getAuthor() { return author; } public String getBody() { return body; } public LocalDate getDate() { return date; } }
- Create a TweetRequest class to be used as a filter for tweets, and move it inside the com.example.demo.domain package:
package com.example.demo.domain; public class TweetRequest { private String author; public TweetRequest() {} public TweetRequest(String author) { this.author = author; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } }
- Create a TweetService class that will return an endless stream of Tweets, and put it inside the com.example.demo.servicepackage:
package com.example.demo.service; import com.example.demo.domain.Tweet; import reactor.core.publisher.Flux; import org.springframework.stereotype.Service; import java.time.Duration; import java.util.HashMap; import java.util.Map; @Service public class TweetService { private static final Map<String, Tweet> tweets = new HashMap<String, Tweet>() { { put("linustorvalds", new Tweet("Linus Torvalds", "Talk is cheap. Show me the code.")); put("robertmartin", new Tweet("Robert Martin", "Truth can only be found in one place: the code.")); put("martinfowler", new Tweet("Martin Fowler", "Any fool can write code that a computer can understand. Good programmers write code that humans can understand.")); } }; public Flux<Tweet> getByAuthor(String author) { return Flux .interval(Duration.ZERO, Duration.ofSeconds(1)) .map(i -> Tweet.of(tweets.get(author))); } }
- Create a TweetSocketController class that sets up a route —
tweets.by.author
— for our socket to receive TweetRequests; place it inside the com.example.demo.api.rsocketpackage:package com.example.demo.api.rsocket; import com.example.demo.domain.Tweet; import com.example.demo.domain.TweetRequest; import com.example.demo.service.TweetService; import reactor.core.publisher.Flux; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.stereotype.Controller; @Controller public class TweetSocketController { private final TweetService service; public TweetSocketController(TweetService service) { this.service = service; } @MessageMapping("tweets.by.author") public Flux<Tweet> getByAuthor(TweetRequest request) { return service.getByAuthor(request.getAuthor()); } }
- Finally, create a TweetController class that serves as a Rest controller to expose an SSE endpoint that triggers the Java RSocket client to pull data from the Java RSocket server; place it inside the com.example.demo.api.rest package:
You can test the end result by running mvn spring-boot:run
and heading to http://localhost:8080/tweets/linustorvalds.
You should see something like this:
Javascript RSocket Client
It is now time to put together our Javascript RSocket client, so that the browser will be able to pull data directly from the Java RSocket server without ever issuing any HTTP request!
PREREQUISITES: npm and browserify must be installed!
- Create a new folder, named public, under src/main/resources and place the following index.htmlfile inside it — it’s pretty straightforward and shouldn’t need any explanation:
<html> <head> <title>RSocket Demo</title> </head> <body> <h2>Filtering by Author</h2> <select id="author-filter"> <option value="linustorvalds">Linus Torvalds</option> <option value="martinfowler">Martin Fowler</option> <option value="robertmartin">Robert Martin</option> </select> <h2>Messages</h2> <p> <ul id="messages"> </ul> </p> <script src="app.js"></script> </body> </html>
- Inside the same folder, add the index.jsfile which represents our Javascript RSocket Client implementation:
const { RSocketClient, JsonSerializer, IdentitySerializer } = require('rsocket-core'); const RSocketWebSocketClient = require('rsocket-websocket-client').default; var client = undefined; function addErrorMessage(prefix, error) { var ul = document.getElementById("messages"); var li = document.createElement("li"); li.appendChild(document.createTextNode(prefix + error)); ul.appendChild(li); } function addMessage(message) { var ul = document.getElementById("messages"); var li = document.createElement("li"); li.appendChild(document.createTextNode(JSON.stringify(message))); ul.appendChild(li); } function main() { if (client !== undefined) { client.close(); document.getElementById("messages").innerHTML = ""; } // Create an instance of a client client = new RSocketClient({ serializers: { data: JsonSerializer, metadata: IdentitySerializer }, setup: { // ms btw sending keepalive to server keepAlive: 60000, // ms timeout if no keepalive response lifetime: 180000, // format of `data` dataMimeType: 'application/json', // format of `metadata` metadataMimeType: 'message/x.rsocket.routing.v0', }, transport: new RSocketWebSocketClient({ url: 'ws://localhost:8080/tweetsocket' }), }); // Open the connection client.connect().subscribe({ onComplete: socket => { // socket provides the rsocket interactions fire/forget, request/response, // request/stream, etc as well as methods to close the socket. socket.requestStream({ data: { 'author': document.getElementById("author-filter").value }, metadata: String.fromCharCode('tweets.by.author'.length) + 'tweets.by.author', }).subscribe({ onComplete: () => console.log('complete'), onError: error => { console.log(error); addErrorMessage("Connection has been closed due to ", error); }, onNext: payload => { console.log(payload.data); addMessage(payload.data); }, onSubscribe: subscription => { subscription.request(2147483647); }, }); }, onError: error => { console.log(error); addErrorMessage("Connection has been refused due to ", error); }, onSubscribe: cancel => { /* call cancel() to abort */ } }); } document.addEventListener('DOMContentLoaded', main); document.getElementById('author-filter').addEventListener('change', main);
Whenever the page is loaded or the author-filter select list value is updated, the main()
function is invoked, which in turn disconnects any previously connected RSocket client and opens a new connection to ws://localhost:8080/tweetsocket.
Once a connection is successfully obtained, the request/stream interaction mode is used to obtain a stream of tweets over time, specifying the WebSocket route as the message metadata with MIME type message/x.rsocket.routing.v0
, and showing each received tweet on the page.
- Add the following package.json file inside the publicfolder:
{ "name": "rsocket-demo", "private": true, "description": "RSocket Demo", "version": "0.0.1", "repository": { "type": "git", "url": "https://github.com/dsibilio/rsocket-demo.git" }, "license": "BSD-3-Clause", "dependencies": { "fbjs": "^0.8.12", "rsocket-core": "^0.0.10", "rsocket-flowable": "^0.0.10", "rsocket-tcp-server": "^0.0.10", "rsocket-types": "^0.0.10", "rsocket-websocket-client": "^0.0.10", "rsocket-websocket-server": "^0.0.10", "ws": "^5.2.1" } }
- Move inside the src/main/resources/publicdirectory and run
npm install
to download all the required dependencies - Without changing directory, run
browserify index.js > app.js
You can now deploy your application as you did earlier with mvn spring-boot:run
, and head over to http://localhost:8080/index.html to see the fruits of your labor and play around until you get bored!
Conclusion
RSocket could very well become the future of transport protocols as it rapidly transitions into a mature product, with an active community regularly contributing to its growth.
I hope you got the gist of it after reading this article, but if you still want to learn more don’t be afraid to head over to:
…or dig some more into my rsocket-demo source code!
Further Reading
Reactive Service-to-Service Communication With RSocket (Part 1)
Published at DZone with permission of Domenico Sibilio. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments