Reactive Streams in Java 9
Let's take a deep dive into Reactive Streams in JDK 9, their defining philosophy of asynchronous communication, and how best to use them in your code.
Join the DZone community and get the full member experience.
Join For FreeThe release of Java Development Kit (JDK) 9 in September of last year brought with it many improvements to both the Java Virtual Machine (JVM) and the Java Language Specification (JLS). Foremost among these additions was the inclusion of the JDK Enhancement Proposal (JEP) 261: The Module System (nicknamed Project Jigsaw), which introduced one of the most comprehensive overhauls of the JVM since its creation. Behind all of the fanfare, there were also 85 non-module JEPs contained in the release, including the addition of a Hypertext Transfer Protocol (HTTP) 2 client (JEP 11o), jshell (JEP 222), and important concurrency updates (JEP 266). Within these concurrency updates, a major improvement was made to the Java standard libraries: Reactive Streams.
The Reactive Streams initiative was started in 2013 by some of the most influential web application companies (include Netflix) as a means to standardize the exchange of asynchronous data between software components. As more frameworks and library builders supported this initiative and its specification, various Java implementations of this standard began to appear. Due to the simplicity of the Reactive Stream standard, many of the interfaces that made up these implementations were identical, separated only by their package names. In order to reduce this duplication and importation incompatibility, Java 9 now includes basic interfaces for each of the fundamental Reactive Stream concepts in the Flow Concurrency library. This allows all Java applications to depend on this one library for Reactive Stream interfaces, rather than deciding on a specific implementation.
In this article, we will explore the Reactive Streams standard and its implementation in Java. While there are many rich Reactive Streams frameworks, such as ReactiveX, we will not focus on these third-party implementations. Instead, we will deep dive into the official Java implementation of this standard and how this implementation can be used to create applications that respond to asynchronous data streams.
It should be noted that the purpose of this common set of official Java reactive interfaces is to consolidate the various Reactive interfaces into a single location and is not intended for standalone or custom implementations. Creating a custom implementation of the Reactive Streams standard can be error-prone and should be verified by the Reactive Streams Technology Compatibility Kit (TCK). If a developer wishes to use Reactive Streams in their application, he or she should use an existing implementation of the standard, such as ReactiveX or Akka. Likewise, the examples in this article are pedagogical and intended for demonstration purposes only; they should not be used in production applications (a well-tested Reactive Streams implementation should be used instead).
What Are Reactive Streams?
In many applications, data is not retrieved from a fixed storage device, but rather, handled in near-real-time, with users or other systems rapidly injecting information into our system. Most times this data injection is asynchronous, where we do not know ahead of time when the data will be present. In order to facilitate this asynchronous style of data handling, we have to rethink older polling-based models and instead, use a lighter, more streamlined method.
Publishers, Subscribers, and Subscriptions
This is where reactive streams come into their own. Instead of having a client and server style of data handling, where a client requests data from a server and the server responds, possibly asynchronously, to the client with the requested data, we instead use a publish-subscribe mechanism: A subscriber informs a publisher that it is willing to accept a given number of items (requests a given number of items), and if items are available, the publisher pushes the maximum receivable number of items to the subscriber. It is important to note that this is a two-way communication, where the subscriber informs the publisher how many items it is willing to handle and the publisher pushes that number of items to the subscriber.
The process of restricting the number of items that a subscriber is willing to accept (as judged by the subscriber itself) is called backpressure and is essential in prohibiting the overloading of the subscriber (pushing more items that the subscriber can handle). This scheme is illustrated in the figure below.
This two-way connection between a publisher and a subscriber is called a subscription. This subscription binds a single publisher to a single subscriber (one-to-one relationship) and may be unicast or multicast. Likewise, a single publisher may have multiple subscribers subscribed to it, but a single subscriber may only be subscribed to a single producer (a publisher may have many subscribers, but a subscriber may subscribe to at most one publisher).
When a subscriber subscribes to a publisher, the publisher notifies the subscriber of the subscription that was created, allowing the subscriber to store a reference to the subscription (if desired). Once this notification process is completed, the subscriber can inform the publisher that it is ready to receive some n number of items.
When the publisher has items available, it then sends at most n number of items to the subscriber. If an error occurs in the publisher, it signals the subscriber of the error. If the publisher is permanently finished sending data, it signals the subscriber that it is complete. If the subscriber is notified that either an error occurred or the publisher is complete, the subscription is considered canceled and no more interactions between the publisher and subscriber (or the subscription) will take place. This subscription workflow is illustrated in the figure below.
It is important to note that there are two theoretical approaches to streaming data to a subscriber: (1) the subscription holds the items or (2) the publisher holds the items. In the first case, the publisher pushes items to the subscription when they become available; when, at a later time, the subscriber requests n items, the subscription provides n or fewer items it has previously been given by the publisher. This may be used when the publisher manages queued items, such as incoming HTTP requests. In the second case, the subscriber forwards requests to the publisher, which pushes n or fewer items to the subscription, which in turn pushes those same items to the subscriber. This scenario may be more suitable for instances were items are generated as needed, such as with a prime number generator.
It is also important to note that items do not have to be present before a request can be made. If a subscriber makes a request for n items and no items are available, the subscriber will wait until at least one item is available and is pushed to the subscriber. If there are i items available when the subscriber makes a request for n items, where i is less than n, the i items are pushed to the subscriber. Once j more items are available, n - i items of the j items are also pushed to the subscriber until n number of total items have been pushed to the subscriber (i + j = n), or the subscriber has requested m more items; in this case, all j number of items may be pushed to the subscriber so long as i + j is less than or equal to n + m. The number of items that a subscriber can accept at any given time (which may or may not be equal to n, depending on the number of items already pushed to the subscriber) is called the outstanding demand.
For example, suppose a subscriber requests 5 items and 7 are currently available in the publisher. The outstanding demand for the subscriber is 5 so 5 of the 7 items are pushed to the subscriber. The remaining 2 items are maintained by the publisher, awaiting the subscriber to request more items. If the subscriber then requests 10 more items, the 2 remaining items are pushed to the subscriber, resulting in an outstanding demand of 8. If 5 more items become available in the publisher, these 5 items are pushed to the subscriber, leaving an outstanding demand of 3. The outstanding demand will remain at 3 unless the subscriber requests n more items, in which case the outstanding demand will increase to 3 + n, or more i items are pushed to the subscriber, in which case the outstanding demand will decrease to 3 - i (to a minimum of 0).
Processors
If an entity is both a publisher and a subscriber, it is called a processor. A processor commonly acts as an intermediary between another publisher and subscriber (either of which may be another processor), performing some transformation on the stream of data. For example, a processor can be created that filters out items that match some criteria before passing them onto its subscriber. A visual representation of a processor is illustrated in the figure below.
With a foundational understanding of how reactive streams operate, we can transform these concepts into the realm of Java by codifying them into interfaces.
Interface Representation
Given the description above, Reactive Streams are made up of four main entities: (1) publishers, (2) subscribers, (3) subscriptions, and (4) processors. From an interface perspective, publishers are only required to allow subscribers to subscribe to them. Therefore, we can create a simple interface for a publisher, where the formal generic type parameter, T
, represents the type of the items that the publisher produces:
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
This interface definition requires that we subsequently define the interface for a subscriber. As stated above, a subscriber has four main interactions: (1) notification of being subscribed, (2) accepting pushed items, (3) accepting errors that occur in a subscribed publisher, and (4) notification when a publisher is complete. This results in the following interface, which is likewise parameterized by the type of the items it requests:
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Next, we must define the interface for a subscription. This entity is simpler than a subscriber and responsible for only two actions: (1) accepting requests for items and (2) being canceled. This results in the following interface definition:
public interface Subscription {
public void request(long n);
public void cancel();
}
Lastly, we define a processor as a combination of the publisher and subscriber interfaces, with an important quirk: A processor may produce items of a different type than the type of the items it consumes. Therefore, we will use the formal generic type parameter T
to represent the type of the items the processor consumes and R
to represent the type of the items it returns (or produces). Note that an implementation of a producer may consume and produce items of the same type, but there is no compile-time restriction that it must do so. This results in the following interface:
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
While these four interfaces constitute the codified contract for Reactive Streams, there are a number of other restrictions and intended behaviors that these interfaces must conform to. These specifications, along with the above interface definitions, can be found in the Reactive Streams JVM Specification. As we will see in the next section, the standard Java implementation of the Reactive Stream specification is nearly identical to that of the Reactive Streams JVM specification and acts as a standardization of the Reactive Streams contracts within the Java Standard Library.
How Do Reactive Streams Work in Java?
The standard Java port of the Reactive Streams interfaces is found in the java.util.concurrent.Flow class and are bundled as static interfaces within the Flow
class. With the JavaDocs removed, the Flow
class is defined as follows:
public final class Flow {
private Flow() {} // uninstantiable
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
}
While there is not much new to discuss when comparing the Reactive Streams JVM specification to the standard Java definitions, the standard Java version does include one publisher implementation: SubmissionPublisher
. The SubmissionPublisher
class acts as a simple publisher, which accepts items to push to subscribers using a submit(T item)
method. When an item is submitted to the submit
method, it is asynchronously pushed to subscribers, as in the following example:
public class PrintSubscriber implements Subscriber<Integer> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Received item: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable error) {
System.out.println("Error occurred: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("PrintSubscriber is complete");
}
}
public class SubmissionPublisherExample {
public static void main(String... args) throws InterruptedException {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
publisher.subscribe(new PrintSubscriber());
System.out.println("Submitting items...");
for (int i = 0; i < 10; i++) {
publisher.submit(i);
}
Thread.sleep(1000);
publisher.close();
}
}
Running this example results in the following output:
Submitting items...
Received item: 0
Received item: 1
Received item: 2
Received item: 3
Received item: 4
Received item: 5
Received item: 6
Received item: 7
Received item: 8
Received item: 9
PrintSubscriber is complete
Within our subscriber, we capture the Subscription
object that was passed to the onSubscribe
method, allowing us to interact with the Subscription
at a later time. Once we store the Subscription
object, we immediately inform the Subscription
that our subscriber is ready to accept one item (by calling subscription.request(1)
). We do likewise within the onNext
method after we print the received item. This amounts to informing the publisher that we are ready to accept another item as soon as we have finished processing an item.
In our main method, we simply instantiate a SubmissionPublisher
and our PrintSubscriber
and subscribe the latter to the former. Once the subscription is established, we submit the values 0
through 9
to the publisher, which in turn asynchronously pushes the values to the subscriber. The subscriber then handles each item by printing its value to standard output and informs the subscription that it is ready to accept another value. We then pause the main thread for 1 second to allow for the asynchronous submissions to complete. This is a very important step since the submit
method asynchronously pushes the submitted items to its subscribers. Therefore, we must provide a reasonable period of time for the asynchronous action to complete. Lastly, we close the publisher, which in turn notifies our subscriber that the subscription has completed.
We can also introduce a processor and chain the original publisher and subscriber with this processor. In the following example, we create a processor that increments received values by 10 and pushes the incremented values to its subscriber:
public class PlusTenProcessor extends SubmissionPublisher<Integer> implements Subscriber<Integer> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
submit(item + 10);
subscription.request(1);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
closeExceptionally(error);
}
@Override
public void onComplete() {
System.out.println("PlusTenProcessor completed");
close();
}
}
public class SubmissionPublisherExample {
public static void main(String... args) throws InterruptedException {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
PlusTenProcessor processor = new PlusTenProcessor();
PrintSubscriber subscriber = new PrintSubscriber();
publisher.subscribe(processor);
processor.subscribe(subscriber);
System.out.println("Submitting items...");
for (int i = 0; i < 10; i++) {
publisher.submit(i);
}
Thread.sleep(1000);
publisher.close();
}
}
Running this example results in the following output:
Submitting items...
Received item: 10
Received item: 11
Received item: 12
Received item: 13
Received item: 14
Received item: 15
Received item: 16
Received item: 17
Received item: 18
Received item: 19
PlusTenProcessor completed
PrintSubscriber is complete
As we expected, each of the pushed values is incremented by 10 and the events received by the processor (such as receiving an error or completing) are forwarded to the subscriber, resulting in a completed message printed for both the PlusTenProcessor
and the PrintSubscriber
.
Conclusion
In the age of near-real-time data processing, Reactive Streams are a de facto standard. The ubiquity of this programming style has led to numerous implementations of this standard, each with its own duplicate set of interfaces. In an effort to collect these common interfaces into a universal standard for Java, JDK 9 now includes Reactive Streams interfaces, along with a powerful publisher implementation, by default. As we have seen in this article, while these interfaces are unassuming in appearance, they provide a rich method for handling streaming data flows in a standard, interchangeable manner.
Opinions expressed by DZone contributors are their own.
Comments