Tutorial: Reactive Spring Boot, Part 7: Subscribing Multiple Consumers
Learn more about subscribing to multiple consumers in this seventh installment on building a Reactive Spring Boot application.
Join the DZone community and get the full member experience.
Join For FreeThis is the seventh part of our tutorial showing how to build a Reactive application using Spring Boot, Kotlin, Java, and JavaFX. The original inspiration was a 70-minute live demo.
This tutorial is a series of steps during which we will build a full Spring Boot application featuring a Kotlin back-end, a Java client, and a JavaFX user interface.
Here is everything you've missed so far:
Tutorial: Reactive Spring Boot, Part 1: Building a Kotlin REST Service
Tutorial: Reactive Spring Boot, Part 2: A REST Client for Reactive Streams
Tutorial: Reactive Spring Boot, Part 3: A JavaFX Spring Boot Application
Tutorial: Reactive Spring Boot, Part 4: A JavaFX Line Chart
Tutorial: Reactive Spring Boot, Part 5: Auto-Configuration for Shared Beans
Tutorial: Reactive Spring Boot, Part 6: Displaying Reactive Data
In the last step, we made our JavaFX line chart subscribe to prices from our Reactive Spring Boot service and display them in real-time.
In this lesson, we update our live-updating chart to show prices for more than one stock, which means subscribing more than one consumer to our reactive stream of prices.
Below, you will find a video showing the process step-by-step and a textual walk-through (adapted from the transcript of the video) for those who prefer a written format.
Introducing a Price Subscriber
The ChartController
, which manages how the data gets to the view, contains a call that subscribes the ChartController
itself to the client that listens to the price service (look at the previous post to see the starting state of ChartController
for this lesson). We need to change this so we can have more than one subscriber.
- Inside the subscribe call in
ChartController
, replacethis
with a call to the constructor of a new type,PriceSubscriber
. This class will manage everything to do with consuming prices from the client. - Create
PriceSubscriber
as an inner class, it needs to implement Consumer and consumeStockPrice
. - (Tip: IntelliJ IDEA will offer to automatically create this for us if we press Alt+Enter on the PriceSubscriber name after we first type it.)
public void initialize() {
// other code inside the initialize method here...
webClientStockClient.pricesFor(symbol)
.subscribe(new PriceSubscriber());
}
private class PriceSubscriber implements Consumer<StockPrice> {
}
Moving Responsibilities to the PriceSubscriber
- Move the accept method from the
ChartController
and into thePriceSubscriber
. - Remove the “implements Consumer” declaration from the
ChartController
. - Move
seriesData
from theChartController
intoPriceSubscriber
. - Call the
PriceSubscriber
constructor with the symbol, and update thePriceSubscriber
to accept this as a constructor parameter. - (Tip: We can get IntelliJ IDEA to create the appropriate constructor if we pass symbol in, press Alt+Enter and select “Create constructor).
- Move the creation of the Series into the
PriceSubscriber
constructor and store the series as a field. - Add a getter for series because the
ChartController
will need to get this series to add it to the chart. - (Tip: IntelliJ IDEA can generate getters from fields.)
private class PriceSubscriber implements Consumer<StockPrice> {
private Series<String, Double> series;
private ObservableList<Data<String, Double>> seriesData = observableArrayList();
private PriceSubscriber(String symbol) {
series = new Series<>(symbol, seriesData);
}
@Override
public void accept(StockPrice stockPrice) {
Platform.runLater(() ->
seriesData.add(new Data<>(valueOf(stockPrice.getTime().getSecond()),
stockPrice.getPrice()))
);
}
public Series<String, Double> getSeries() {
return series;
}
}
Let’s fix up the initialize method of ChartController
.
- Extract the new
PriceSubscriber
into a local variable; we’re going to need to use this elsewhere in the method. - Move this
priceSubscriber
near the top of the method, and insidedata.add
call the getter.
@Component
public class ChartController {
@FXML
private LineChart<String, Double> chart;
private WebClientStockClient webClientStockClient;
public ChartController(WebClientStockClient webClientStockClient) {
this.webClientStockClient = webClientStockClient;
}
@FXML
public void initialize() {
String symbol = "SYMBOL";
PriceSubscriber priceSubscriber = new PriceSubscriber(symbol);
ObservableList<Series<String, Double>> data = observableArrayList();
data.add(priceSubscriber.getSeries());
chart.setData(data);
webClientStockClient.pricesFor(symbol)
.subscribe(priceSubscriber);
}
private class PriceSubscriber implements Consumer<StockPrice> {
// details in previous code snippet
}
}
If we re-run the application, the chart should run the same way as it did before (assuming we still have the back-end service running as well), since we haven’t changed the behavior, just refactored the subscription code.
Adding a Second Subscriber
- Rename
symbol
tosymbol1
; since we’re going to have another one of these, rename the symbol itself, and let’s also rename ourpriceSubscriber
as well. - (Tip: Use the rename refactoring, Shift+F6, to do this; it makes sure all the code still compiles).
- Duplicate these lines and change the variable names to
symbol2
andpriceSubscriber2
. - (Tip: The keyboard shortcut for Duplicate Line is Ctrl+D/⌘D.)
- Add the second series to the chart to display this second set of prices.
- Duplicate the subscribe code and pass in the second symbol and second subscriber.
public void initialize() {
String symbol1 = "SYMBOL1";
PriceSubscriber priceSubscriber1 = new PriceSubscriber(symbol1);
String symbol2 = "SYMBOL2";
PriceSubscriber priceSubscriber2 = new PriceSubscriber(symbol2);
ObservableList<Series<String, Double>> data = observableArrayList();
data.add(priceSubscriber1.getSeries());
data.add(priceSubscriber2.getSeries());
chart.setData(data);
webClientStockClient.pricesFor(symbol1)
.subscribe(priceSubscriber1);
webClientStockClient.pricesFor(symbol2)
.subscribe(priceSubscriber2);
}
Now when we re-run the application, we can see two different series on the chart tracking the prices for two different stocks, like you can see at 3:30 in the video.
Code Clean-Up
Now that we have the application working the way we want, we can refactor the code if we wish.
- We could move the call to subscribe to the top near where we create the subscriber.
- We can clean up any warnings, for example making the inner class static.
public void initialize() {
String symbol1 = "SYMBOL1";
PriceSubscriber priceSubscriber1 = new PriceSubscriber(symbol1);
webClientStockClient.pricesFor(symbol1)
.subscribe(priceSubscriber1);
String symbol2 = "SYMBOL2";
PriceSubscriber priceSubscriber2 = new PriceSubscriber(symbol2);
webClientStockClient.pricesFor(symbol2)
.subscribe(priceSubscriber2);
ObservableList<Series<String, Double>> data = observableArrayList();
data.add(priceSubscriber1.getSeries());
data.add(priceSubscriber2.getSeries());
chart.setData(data);
}
Bonus Refactoring (Not in the Video!)
There are a number of other ways we could further refactor this to make it easier to read, to reduce duplication, or to group responsibilities differently. For example, we could move the call to subscribe to the PriceSubscriber
to reduce duplication. We can even use var
if we’re running a version of Java that supports it.
public class ChartController {
// ... fields and constructor ...
@FXML
public void initialize() {
var priceSubscriber1 = new PriceSubscriber("SYMBOL1", webClientStockClient);
var priceSubscriber2 = new PriceSubscriber("SYMBOL2", webClientStockClient);
ObservableList<Series<String, Double>> data = observableArrayList();
data.add(priceSubscriber1.getSeries());
data.add(priceSubscriber2.getSeries());
chart.setData(data);
}
private static class PriceSubscriber implements Consumer<StockPrice> {
private Series<String, Double> series;
private ObservableList<Data<String, Double>> seriesData = observableArrayList();
private PriceSubscriber(String symbol, WebClientStockClient stockClient) {
series = new Series<>(symbol, seriesData);
stockClient.pricesFor(symbol)
.subscribe(this);
}
@Override
public void accept(StockPrice stockPrice) {
Platform.runLater(() ->
seriesData.add(new Data<>(valueOf(stockPrice.getTime().getSecond()),
stockPrice.getPrice()))
);
}
private Series<String, Double> getSeries() {
return series;
}
}
}
Conclusion
So now, we have a JavaFX application that subscribes to more than one price from our Spring Boot service, and displays each set of price data in real-time in multiple series on a line chart.
The full code is available on GitHub.
Further Reading
Tutorial: Reactive Spring Boot, Part 2: A REST Client for Reactive Streams
Tutorial: Reactive Spring Boot, Part 3: A JavaFX Spring Boot Application
Tutorial: Reactive Spring Boot, Part 4: A JavaFX Line Chart
Tutorial: Reactive Spring Boot, Part 5: Auto-Configuration for Shared Beans
Tutorial: Reactive Spring Boot, Part 6: Displaying Reactive Data
Published at DZone with permission of Trisha Gee, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments