RxJava FAQ: Loading Files With Backpressure
This guide will help you read large files, XML in this case, while managing resources efficiently by utilizing RXJava Flowables.
Join the DZone community and get the full member experience.
Join For FreeProcessing files as a stream turns out to be tremendously effective and convenient. Many people seem to forget that since Java 8 (3+ years!), we have been able to very easily turn any file into a stream of lines:
String filePath = "foobar.txt";
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
reader.lines()
.filter(line -> !line.startsWith("#"))
.map(String::toLowerCase)
.flatMap(line -> Stream.of(line.split(" ")))
.forEach(System.out::println);
}
reader.lines() returns a Stream<String>, which you can further transform. In this example, we discard lines starting with "#" and explode each line by splitting it into words. This way, we achieve a stream of words as opposed to a stream of lines. Working with text files is almost as simple as working with normal Java collections. In RxJava we already learned about the generate() operator. It can be used here as well to create a robust stream of lines from a file:
Flowable<String> file = Flowable.generate(
() -> new BufferedReader(new FileReader(filePath)),
(reader, emitter) -> {
final String line = reader.readLine();
if (line != null) {
emitter.onNext(line);
} else {
emitter.onComplete();
}
},
reader -> reader.close()
);
The generate() operator in the aforementioned example is a little bit more complex. The first argument is a state factory. Every time someone subscribes to this stream, a factory is invoked and a stateful BufferedReader is created. Then, when downstream operators or subscribers wish to receive some data, a second lambda (with two parameters) is invoked. This lambda expression tries to pull exactly one line from a file and either send it downstream (onNext()
) or complete when the end of the file is encountered. It's fairly straightforward. The third optional argument to generate() is a lambda expression that can do some cleanup with state. It's very convenient in our case, as we have to close the file not only when EOF is reached, but also when consumers prematurely unsubscribe.
Meet Flowable.using()
Operator
This seems like a lot of work, especially when we already have a stream of lines from JDK 8. It turns out that there is a similar factory operator named using() that is quite handy. First of all, the simplest way of translating Stream from Java to Flowable is by converting Stream to an Iterator (checked exception handling ignored):
Flowable.fromIterable(new Iterable<String>() {
@Override
public Iterator<String> iterator() {
final BufferedReader reader = new BufferedReader(new FileReader(filePath));
final Stream<String> lines = reader.lines();
return lines.iterator();
}
});
This can be simplified to:
Flowable.<String>fromIterable(() -> {
final BufferedReader reader = new BufferedReader(new FileReader(filePath));
final Stream<String> lines = reader.lines();
return lines.iterator();
});
But we forgot about closing BufferedReader, thus FileReader, thus the file handle. Thus we introduced a resource leak. Under such circumstances, the using() operator works like a charm. In a way, it's similar to a try-with-resources statement. You can create a stream based on some external resource. The lifecycle of this resource (creation and disposal) will be managed for you when someone subscribes or unsubscribes:
Flowable.using(
() -> new BufferedReader(new FileReader(filePath)),
reader -> Flowable.fromIterable(() -> reader.lines().iterator()),
reader -> reader.close()
);
It's fairly similar to the last generate() example, however, the most important lambda expression in the middle is quite different. We get a resource (reader) as an argument and are supposed to return a Flowable (not a single element). This lambda is called only once, not every time downstream requests a new item. What the using() operator gives us is managing BufferedReaders's lifecycle. using() is useful when we have a piece of state (just like with generate()) that is capable of producing the whole Flowable at once, as opposed to one item at a time.
Streaming XML files
...or JSON for that matter. Imagine you have a very large XML file that consists of the following entries, hundreds of thousands of them:
<trkpt lat="52.23453" lon="21.01685">
<ele>116</ele>
</trkpt>
<trkpt lat="52.23405" lon="21.01711">
<ele>116</ele>
</trkpt>
<trkpt lat="52.23397" lon="21.0166">
<ele>116</ele>
</trkpt>
This is a snippet from standard GPS Exchange Format that can describe geographical routes of arbitrary length. Each <trkpt> is a single point with latitude, longitude, and elevation. We would like to have a stream of track points (ignoring elevation for simplicity) so that the file can be consumed partially, as opposed to loading everything at once. We have three choices:
- DOM/JAXB: Everything must be loaded into memory and mapped to Java objects. Won't work for infinitely long files (or even very large ones).
- SAX: A push-based library that invokes callbacks whenever it discovers an XML tag opening or closing. Seems a bit better, but can't possibly support backpressure — it's the library that decides when to invoke callbacks and there is no way of slowing it down.
- StAX: Like SAX, but we must actively pull for data from XML file. This is essential to support backpressure — we decide when to read next chunk of data.
Let's try to implement parsing and streaming of a possibly very large XML file using StAX and RxJava. First, we must learn how to use StAX in the first place. The parser is called XMLStreamReader and is created with the following sequence of spells and curses:
XMLStreamReader staxReader(String name) throws XMLStreamException {
final InputStream inputStream = new BufferedInputStream(new FileInputStream(name));
return XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
}
Just close your eyes and make sure you always have a place to copy-paste the snippet above from. It gets even worse. In order to read the first <trkpt> tag, including its attributes, we must write quite some complex code:
import lombok.Value;
@Value
class Trackpoint {
private final BigDecimal lat;
private final BigDecimal lon;
}
Trackpoint nextTrackpoint(XMLStreamReader r) {
while (r.hasNext()) {
int event = r.next();
switch (event) {
case XMLStreamConstants.START_ELEMENT:
if (r.getLocalName().equals("trkpt")) {
return parseTrackpoint(r);
}
break;
case XMLStreamConstants.END_ELEMENT:
if (r.getLocalName().equals("gpx")) {
return null;
}
break;
}
}
return null;
}
Trackpoint parseTrackpoint(XMLStreamReader r) {
return new Trackpoint(
new BigDecimal(r.getAttributeValue("", "lat")),
new BigDecimal(r.getAttributeValue("", "lon"))
);
}
The API is quite low-level and almost adorably antique. Everything happens in a gigantic loop that reads... something of type int. This int can be START_ELEMENT, END_ELEMENT, or a few other things that we are not interested in. Remember, we are reading an XML file, but not line-by-line or char-by-char — but by logical XML tokens (tags). So, if we discover an opening of a <trkpt> element, we parse it, otherwise, we continue. The second important condition is when we find a closing </gpx>, which should be the last thing in GPX file. We return null in such case, signaling end-of-XML-file.
Feels complex? This is actually the simplest way to read large XML files with constant memory usage, irrespective of file size. How does all of this relate to RxJava? At this point, we can very easily build a Flowable<Trackpoint>. Flowable, not Observable (see: Observable
vs. Observable
). Such a stream will have full support for backpressure, meaning it will read the file at the appropriate speed:
Flowable<Trackpoint> trackpoints = generate(
() -> staxReader("track.gpx"),
this::pushNextTrackpoint,
XMLStreamReader::close);
void pushNextTrackpoint(XMLStreamReader reader, Emitter<Trackpoint> emitter) {
final Trackpoint trkpt = nextTrackpoint(reader);
if (trkpt != null) {
emitter.onNext(trkpt);
} else {
emitter.onComplete();
}
}
Wow, so simple, such backpressure![1] We first create an XMLStreamReader and make sure it's being closed when the file ends or someone unsubscribes. Remember that each subscriber will open and start parsing the same file over and over again. The lambda expression in the middle simply takes the state variables (XMLStreamReader) and emits one more trackpoint. All of this seems quite obscure, and it is! But we now have a backpresure-aware stream taken from a possibly very large file using very few resources. We can process the trackpoint concurrently or combine them with other sources of data. In the next article, we will learn how to load JSON in a very similar way.
Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments