RxJava: flatMap() vs. concatMap() vs. concatMapEager()
On the face of it, flatMap, concatMap, and concatMapEager do largely the same thing. But let's look under the hood to see which works best in a given situation.
Join the DZone community and get the full member experience.
Join For FreeThere are three seamlessly similar operators in RxJava 2.x: flatMap(), concatMap(), and concatMapEager(). All of them accept the same argument — a function from the original stream's individual item to a (sub-)stream of an arbitrary type. In other words, if you have a Flowable<T>, you provide a function from T to Flowable<R> for an arbitrary R type. After applying any of these operators, you end up with Flowable<R>. So how are they different?
Sample Project
First, let's build a sample application. We will use a Retrofit2 HTTP client wrapper that has built-in plugins for RxJava2. Our task is to leverage the GeoNames API in order to find the population of any city in the world. The interface looks as follows:
public interface GeoNames {
Flowable<Long> populationOf(String city);
}
The implementation of this interface is auto-generated by Retrofit. Scroll down to see the glue source code. For the time being, just assume we have a function that takes a String with a city name and asynchronously returns a one-element stream with a population of that city. Also assume that we have a fixed stream of cities we want to look up:
Flowable<String> cities = Flowable.just(
"Warsaw", "Paris", "London", "Madrid"
);
Our goal is to fetch the population of each city.
concatMap(): Process Upstream Sequentially
The sample application with concatMap() looks as follows:
cities
.concatMap(geoNames::populationOf)
.subscribe(response -> log.info("Population: {}", response));
Before we see the outcome, let's study what concatMap() is doing underneath. For each upstream event (city) it invokes a function that replaces that event with a (sub)stream. In our case, it's a one-element stream of Long (Flowable<Long>). So with all the operators we are comparing, we end up with a stream of streams of Long (Flowable<Flowable<Long>>). The real difference arises when we analyze what the operator is doing in order to flatten such a nested stream.
concatMap() will first subscribe to the very first substream (Flowable<Long> representing the population of Warsaw). By subscribing, we actually mean making the physical HTTP call. Only when the first substream completes (emits a single Long in our case and signals completion) will concatMap() continue. Continuing means subscribing to the second substream and waiting for it to complete. The resulting stream completes when the very last substream completes. This leads to the following stream: 1702139, 2138551, 7556900, and 3255944. These happen to be populations of Warsaw, Paris, London, and Madrid, accordingly. The order of output is entirely predictable. However, it's also entirely sequential. No concurrency happens at all. We make the second HTTP call only when the first one is completed. The added complexity of RxJava doesn't pay off at all:
23:33:33.531 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
23:33:33.656 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (123ms)
23:33:33.674 | Rx-1 | Population: 1702139
23:33:33.676 | Rx-1 | --> GET .../searchJSON?q=Paris http/1.1
23:33:33.715 | Rx-1 | <-- 200 OK .../searchJSON?q=Paris (38ms)
23:33:33.715 | Rx-1 | Population: 2138551
23:33:33.716 | Rx-1 | --> GET .../searchJSON?q=London http/1.1
23:33:33.754 | Rx-1 | <-- 200 OK .../searchJSON?q=London (37ms)
23:33:33.754 | Rx-1 | Population: 7556900
23:33:33.755 | Rx-1 | --> GET .../searchJSON?q=Madrid http/1.1
23:33:33.795 | Rx-1 | <-- 200 OK .../searchJSON?q=Madrid (40ms)
23:33:33.796 | Rx-1 | Population: 3255944
As you can see, no multithreading occurs. Requests are sequential, waiting for each other. Technically, not all of them must happen in the same thread, but they never overlap and take advantage of concurrency. The big plus is the guaranteed order of resulting events, which is not that obvious once we jump into flatMap()...
flatMap(): Processing Results On-the-Fly, Out-of-Order
flatMap() code is almost exactly the same:
cities
.flatMap(geoNames::populationOf)
.subscribe(response -> log.info("Population: {}", response));
And just like before, we start with a stream of streams of Long (Flowable<Flowable<Long>>). However, rather than subscribing to each substream one after another, the flatMap() operator eagerly subscribes to all substreams at once. This means we see multiple HTTP requests being initiated at the same time in different threads:
00:10:04.919 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:10:04.919 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:10:04.919 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:10:04.919 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:10:05.449 | Rx-3 | <-- 200 OK .../searchJSON (529ms)
00:10:05.462 | Rx-3 | Population: 7556900
00:10:05.477 | Rx-1 | <-- 200 OK .../searchJSON (557ms)
00:10:05.478 | Rx-1 | Population: 1702139
00:10:05.751 | Rx-4 | <-- 200 OK .../searchJSON (831ms)
00:10:05.752 | Rx-4 | Population: 3255944
00:10:05.841 | Rx-2 | <-- 200 OK .../searchJSON (922ms)
00:10:05.843 | Rx-2 | Population: 2138551
When any of the underlying substreams emit any value, it is immediately passed downstream to the subscriber. This means we can now process events on-the-fly, as they are produced. Notice that the resulting stream is out of order. The first event we received is 7556900, which happens to be the population of London, second in the initial stream. Contrary to concatMap(), flatMap() can't preserve order, thus emitting values in a "random" order. Well, not really random, we simply receive values as soon as they are available. In this particular execution, the HTTP response for London came first, but there is absolutely no guarantee for that. This leads to an interesting problem. We have a stream of various population values and the initial stream of cities. However, the output stream can be an arbitrary permutation of events and we have no idea which population corresponds to which city. We will address this problem in a subsequent article.
concatMapEager(): Concurrent, In-Order, but Somewhat Expensive
concatMapEager() seems to bring the best of both worlds: concurrency and guaranteed order of output events:
cities
.concatMapEager(geoNames::populationOf)
.subscribe(response -> log.info("Population: {}", response));
After learning what concatMap() and flatMap() are doing, understanding concatMapEager() is fairly simple. Having a stream of streams, concatMapEager() eagerly (duh!) subscribes to all substreams at the same time, concurrently. However, this operator makes sure that results from the first substream are propagated first, even if it's not the first one to complete. An example will quickly reveal what this means:
00:34:18.371 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:34:18.371 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:34:18.371 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:34:18.371 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:34:18.517 | Rx-3 | <-- 200 OK .../searchJSON?q=London (143ms)
00:34:18.563 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (189ms)
00:34:18.565 | Rx-1 | Population: 1702139
00:34:20.460 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (2086ms)
00:34:20.460 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (2086ms)
00:34:20.461 | Rx-2 | Population: 2138551
00:34:20.462 | Rx-2 | Population: 7556900
00:34:20.462 | Rx-2 | Population: 3255944
We initiate four HTTP requests instantly. From the log output, we clearly see that the population of London was returned first. However, the subscriber did not receive it because the population of Warsaw didn't arrive yet. By coincidence, Warsaw completed second so at this point, the population of Warsaw can be passed downstream to a subscriber. Unfortunately, the population of London must wait even longer because, first, we need a population of Paris. Once Paris (immediately followed by Madrid) completes, all remaining results are passed downstream.
Notice how the population of London, even though available, must wait dormant until Warsaw and Paris complete? So is concatMapEager() the best possible operator for concurrency? Not quite. Imagine we have a list of a thousand cities and for each one, we fetch a single 1MB picture. With concatMap(), we download pictures sequentially, i.e. slowly. With flatMap(), pictures are downloaded concurrently and processed as they arrive, as soon as possible. Now, what about concatMapEager()? In the worst case scenario, we can end up with concatMapEager() buffering 999 pictures because pictures from the very first city happen to be the slowest. Even though we already have 99.9% of the results, we cannot process them because we enforce strict ordering.
Which Operator to Use?
flatMap() should be your first weapon of choice. It allows efficient concurrency with streaming behavior. But be prepared to receive results out-of-order. concatMap() works well only when provided transformation is so fast that the sequential processing is not a problem. concatMapEager() is very convenient, but watch out for memory consumption. Also, in the worst case scenario, you may end up sitting idle, waiting for very few responses.
Appendix: Configuring a Retrofit2 Client
The GeoNames service interface that we used throughout this article, in fact, looks like this:
public interface GeoNames {
@GET("/searchJSON")
Single<SearchResult> search(
@Query("q") String query,
@Query("maxRows") int maxRows,
@Query("style") String style,
@Query("username") String username
);
default Flowable<Long> populationOf(String city) {
return search(city, 1, "LONG", "s3cret")
.map(SearchResult::getGeonames)
.map(g -> g.get(0))
.map(Geoname::getPopulation)
.toFlowable();
}
}
The implementation of the non-default method is auto-generated by Retrofit2. Notice that populationOf() returns a one-element Flowable<Long> for simplicity's sake. However, to fully embrace the nature of this API, other implementations would be more reasonable in the real world. First of all, the SearchResult class returns an ordered list of results (getters/setters omitted):
class SearchResult {
private List<Geoname> geonames = new ArrayList<>();
}
class Geoname {
private double lat;
private double lng;
private Integer geonameId;
private Long population;
private String countryCode;
private String name;
}
After all, there are many Warsaws and Londons in the world. We silently assume the list will contain at least one element and the first one is the right match. A more appropriate implementation should either return all hits or, even better, a Maybe<Long> type to reflect no matches:
default Maybe<Long> populationOf(String city) {
return search(city, 1, "LONG", "nurkiewicz")
.flattenAsFlowable(SearchResult::getGeonames)
.map(Geoname::getPopulation)
.firstElement();
}
The glue code looks as follows. First, here's Jackson's setup in order to parse responses from the API:
import com.fasterxml.jackson.databind.ObjectMapper;
private ObjectMapper objectMapper() {
return new ObjectMapper()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
}
FAIL_ON_UNKNOWN_PROPERTIES is often what you desire. Otherwise, you have to map all fields from a JSON response and your code will break when the API producer introduces new, otherwise backward compatible fields. Then we set up OkHttpClient, used underneath by Retrofit:
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
private OkHttpClient client() {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
return new OkHttpClient.Builder().addInterceptor(interceptor).build();
}
Sometimes, you can skip the configuration of OkHttp client but we added a logging interceptor. By default, OkHttp logs using java.util.logging, so in order to use a decent logging framework, we must install a bridge at the very beginning:
import org.slf4j.bridge.SLF4JBridgeHandler;
static {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
}
And finally, Retrofit itself:
import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.jackson.JacksonConverterFactory;
GeoNames createClient() {
return new Retrofit.Builder()
.client(client())
.baseUrl("http://api.geonames.org")
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
.addConverterFactory(JacksonConverterFactory.create(objectMapper()))
.build()
.create(GeoNames.class);
}
Calling createClient() will yield a dynamic implementation of the GeoNames interface. We used the following dependencies:
compile 'io.reactivex.rxjava2:rxjava:2.0.6'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.retrofit2:converter-jackson:2.0.1'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.0'
compile 'ch.qos.logback:logback-classic:1.1.7'
compile 'org.slf4j:slf4j-api:1.7.21'
compile 'org.slf4j:jul-to-slf4j:1.7.21'
Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments