Now that we've explored theory in the previous chapter, let's get some hands-on practice with the Flow SPI!
Building Java Reactive Streams with Flow
In JDK 9 the Flow SPI was introduced to Java. The Flow package introduced a set of interfaces describing the reactive components of the Reactive Streams specification.
Flow is a service provider interface (SPI) intended for framework makers to have available as a reference implementation. It's not intended for production code, but it's great for learning and prototyping. Flow matches what you'd expect to see in the many reactive Java frameworks such as RxJava. We'll look at this a little later.
The main interfaces and classes required to build a reactive application are:
Interface | Description | Example |
This is an interface that matches the publisher described in the Reactive Stream specification (see above). It implements the |
| |
This interface matches the subscriber described in the Reactive Streams specification. It should be implemented with all the methods described above. |
| |
As per the Reactive Streams specification, this implements both the publisher and subscriber interfaces. |
|
Implementing a Subscriber
Let's build a subscriber that receives a stream of temperatures:
Let's break this down:
Step 1: Create a subscriber by implementing java.util.concurrent.Flow.Subscriber:
importj ava.util.concurrent.Flow.Subscription;
public class TemperatureSubscriber implements Flow.Subscriber<Double> {
}
Step 2: Implement each of the methods in the Flow.Subscriber interface:
public class TemperatureSubscriber implements Flow.Subscriber<Double> {
@Override
public void onSubscribe(Subscription subscription) {
}
@Override
public void onNext(Double temperature) {
}
@Override
public void onError(Throwablethrowable) {
}
@Override
public void onComplete() {
}
}
Step 3: Create a Subscription subscription
field on the subscriber so you can manage the subscription after it's passed to onSubscribe()
.
public class TemperatureSubscriber implements Flow.Subscriber<Double> {
private Subscription subscription;
...
@Override
public void onSubscribe(Subscription subscription) {
this.subscription=subscription;
}
...
}
Step 4: Call subscription.request(1)
from the onSubscribe()
method.
This requests a single message from the publisher. In most cases, you'd probably want to set this to a larger number, to queue up more messages for your consumer. If the publisher stops at any point, this is the number of messages you can expect to process before the subscriber also has to stop.
@Override
public void onSubscribe(Subscription subscription) {
this.subscription=subscription;
subscription.request(1);
}
Step 5: Implement an onNext(T message)
method that gets called with each item published by the publisher. The magic of Reactive Streams is that this will just get called when there is data ready. You won't have to do anything other than subscribe to a publisher.
@Override
public void onNext(Doubletemperature) {
System.out.println("Temperature: "+temperature)
// Get the next item
subscription.request( 1 );
}
Line 3: Do something with the message, in this case, just print it.
Line 5: By calling subscription.request(1), you ensure that more data keeps flowing from the publisher to the subscriber. You'll have to keep doing this.
Those were the main methods to worry about.
Implementing a Publisher
Next, let's build a publisher. To make life easier, we'll use one that Java provides.
java.util.concurrent.SubmissionPublisher<T>
comes with the JDK. This is a publisher implementation provided in the JDK. In addition to providing a subscribe(Subscription s)
, it also allows you to publish data to subscribers by calling submit(T message)
.
Let me show you how to use it!
As you saw, all I did was extend SubmissionPublisher. Creating a SubmissionPublisher is easy as it's a concrete class.
I could have also done:
SubmissionPublisher<Double> publisher = new SubmissionPublisher<>();
However, by extending it, you can implement special behavior. However, the SubmissionPublisher is not part of the Reactive Streams specification. It's a class implemented in Java that lets us use ReactiveStreams. This means that you won't find it in other frameworks which you need to use it for production.
To start publishing to the subscriber, we created an instance of our subscriber and passed it to subscribe:
Temperature Subscriber subscriber = new TemperatureSubscriber();
TemperaturePublisherpublisher=newTemperaturePublisher();
// subscribe to the publisher
publisher.subscribe(subscriber);
IntStream.range(0, 220).forEach( (n)-> {
publisher.submit(Double.valueOf(n));
});
publisher.close();
Lines: 1 to 2: Create a subscriber and a publisher.
Line 5: Subscribe to the publisher with the subscriber.
Line 7: Send 200 numbers to the publisher.
Line 11: Stop the publisher by calling
.close()
.
As you saw, flow stops when your main thread terminates. That is, the program finishes faster than flow can do its jobs. The code went straight to Line 11, without blocking. That's fantastic non-blocking behavior, but you need the code to run. As you saw in the video, I had to manage that blocking behavior with either a while loop or a CountDownLatch.
As easy as that was, let's check out a more powerful framework which conforms to the Reactive Streams specification. In fact, this is one which helped shape it. RxJava.
Working with RxJava 2
RxJava 2 is a part of the family of ReactiveExtensions, which provides similar Reactive libraries for many languages, including Java. The RxJava team were collaborators in shaping the Reactive Streaming specification, and RxJava 2 was the first framework to support it.
What does this mean?
Well, it means that you can easily port, or convert, your code from Flow to RxJava.
Why would we not use Flow?
Flow is a service provider interface (SPI), intended as a demonstration or as an API for other frameworks. RxJava is one such framework, and you'll see how to move your code over. RxJava also builds in backpressure and gives you several schemes to choose from. All of these make sure that your consumers don't miss out on messages.
Let's dive in!
Let's break this down!
Step 1: Creating an RxJava subscriber involved swapping two classes:
We used
org.reactivestreams.Subscription
instead ofFlow.Subscription
.We used
org.reactivestreams.Subscriber
instead ofFlow.Subscriber
.
The rest of our code was the same!
We could have gone even further like removing the CountDownLatch as RxJava streams stay up while there are still unprocessed messages in a subscription.
Step 2: RxJava does not have a SubmissionPublisher, but instead, it comes with the Flowable class. This has lots of static methods for creating all sorts of publishers.
To just publish a bunch of values, I used Flowable.just()
method and passed it several doubles for the subscriber. This allowed us to create a stream in place with a bunch of values:
public class RxJavaPlanetaryAveragingApp {
public static void main(String[] args) {
RxJavaTemperatureSubscriber temperatureSubscriber
= new RxJavaTemperatureSubscriber();
// From some values
Flowable.just(1.0, 2.0, 3.0).subscribe( temperatureSubscriber );
}
}
As you can see here, we chained .subscribe()
onto the just()
publisher.
Step 3: I then showed you how you could create a publisher from all sorts of objects. First, we used fromIterable() to publish data from a normal Java stream. To make this work, we passed it the iterator from the stream.
// from a stream
IntStream stream=IntStream.range(0, 100);
Flowable.fromIterable(stream::iterator).onBackpressureBuffer().
map((intValue) -> Double.valueOf(intValue)).
subscribe( temperatureSubscriber );
Line 2: Create an IntStream of 100 values.
Line 3: We do two things here:
Create a publisher using Flowable.fromIterable(), and passing it the iterator from stream.
Call
onBackpressureBuffer()
to ensure that all data sent to subscribers is kept in memory until subscribers receive it.
Look at the RxJava Flowable documentation to see the different schemes I mentioned in the screencast.
Line 4: In an approach similar to a normal stream, you can use a map operator from Flowable, which becomes another inline subscriber with a lambda. How easy is that!? 😄
Line 5: Subscribe to the output from this stream using a normal subscribe call. Even better, this didn't need any latches or additional blocking.
Line 6: If all you want to do is publish a range of numbers, you can use another one of Flowable's many operators. These are methods that do all sorts of things. This time, we'll use Flowable.range(), which takes a starting number n, and the number of times t, it should increment that number.
// from Rx
Flowable.range(300, 400).onBackpressureBuffer().
map((intValue) ->Double.valueOf(intValue)).
subscribe( temperatureSubscriber );
This publishes n+t numbers. And that's all there is to it! 💪
Would these solutions be considered reactive systems?
These are simple examples, but they are also responsive, resilient, scalable, and reliable. The true answer is in how it scales. If your range operator was a file pushing through hundreds of millions of temperatures, this software would probably cope with it. The slowest step would be the subscriber, which prints that value to the screen.
Admittedly, printing temperatures to the screen is probably not very useful unless you're an android working work for NASA with a positronic brain and can do something useful with numbers on a screen. The good news is that you can quickly introduce a new subscriber to do something completely different with that data, and it won't be impacted by the performance of your screen printing subscriber.
You could elastically scale to request more data across more subscribers or send errors to an error subscriber. These are all the makings of a reactive system.
Try It Out for Yourself!
Step 1: Git clone or fork the following repository:
git clone https://github.com/OpenClassrooms-Student-Center/Scale-up-your-code-with-Java-concurrencyB.gitYou're all set to explore further and learn more about Reactive Programming and how you can use it to create fast and scalable concurrent applications!
Step 2: Go into the folder and run the Flow application. Also, check out the code and feel free to tinker with and change it. It's all in Git, so it's safe to play with.
We'll use the runSimpleFlowExample Gradle task to run it.
./gradlew runSimpleFlowExampleStep 3: Now run the RxJava version with the runSimpleRxJavaExample Gradle task:
./gradlew runSimpleRxJavaExample
Also, have a play with this one. 🤓
Let's Recap!
Flow is a service provider interface (SPI) that can be used to prototype solutions.
RxJava 2 is a ReactiveExtension that handles backpressure well.
In the next chapter, let's review what you've learned in this course!