Consuming java.util.concurrent.BlockingQueue as rx.Observable
An implementation of the producer-consumer pattern using BlockingQueues
Join the DZone community and get the full member experience.
Join For FreeClassical producer-consumer pattern is relatively simple in Java since we have java.util.concurrent.BlockingQueue. To avoid busy waiting and error-prone manual locking we simply take advantage of put() and take(). They both block if queue is full or empty respectively. All we need is a bunch of threads sharing reference to the same queue: some producing and others consuming. And of course the queue has to have a limited capacity, otherwise we will soon run out of memory in case of producers outperforming consumers. Greg Young couldn't emphasize enough this rule during Devoxx Poland:
Never, ever create an unbounded queue
Producer-consumer usingBlockingQueue
Here is a simplest example. First we need a producer that puts objects in a shared queue:
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
class Producer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = new User("User " + System.currentTimeMillis());
log.info("Producing {}", user);
queue.put(user);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
Producer simply publishes an instance of User
class (whatever it is) to a given queue every second. Obviously in real life placing User
in a queue would be a result of some action within a system, like user login. Similarly consumer takes new items from a queue and processes them:
@Slf4j
@Value
class Consumer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = queue.take();
log.info("Consuming: {}", user);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
Again in real life processing would mean storing in database or running some fraud detection on a user. We use queue to decouple processing thread from consuming thread, e.g. to reduce latency. To run a simple test let's spin up few producer and consumer threads:
BlockingQueue<User> queue = new ArrayBlockingQueue<>(1_000);
final List<Runnable> runnables = Arrays.asList(
new Producer(queue),
new Producer(queue),
new Consumer(queue),
new Consumer(queue),
new Consumer(queue)
);
final List<Thread> threads = runnables
.stream()
.map(runnable -> new Thread(runnable, threadName(runnable)))
.peek(Thread::start)
.collect(toList());
TimeUnit.SECONDS.sleep(5);
threads.forEach(Thread::interrupt);
//...
private static String threadName(Runnable runnable) {
return runnable.getClass().getSimpleName() + "-" + System.identityHashCode(runnable);
}
We have 2 producers and 3 consumers, everything seems to be working. In real life you would probably have some implicit producer threads, like HTTP request handling threads. On the consumer side you would most likely use a thread pool. This pattern works well, but especially the consuming side is quite low-level.
Introducing ObservableQueue<T>
The purpose of this article is to introduce an abstraction that behaves like a queue from producer side but as an Observable from RxJava on consumer side. In other words we can treats objects added to a queue as a stream that we can map, filter, compose, etc. on the client side. Interestingly, this is no longer a queue under the hood. ObservableQueue<T> simply forwards all new objects straight to subscribed consumers and doesn't buffer events in case of no-one listening ("hot" observable).
ObservableQueue<T> is not a queue per-se, it's just a bridge between one API and the other. It's similar to java.util.concurrent.SynchronousQueue, but if no-one is interested in consuming, object is simply discarded.
Here is a first experimental implementation. It's just a toy code, do not consider it production ready. Also we'll greatly simplify it later:
public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {
private final Set<Subscriber<? super T>> subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Observable<T> observable = Observable.create(subscriber -> {
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
subscribers.remove(subscriber);
}
@Override
public boolean isUnsubscribed() {
return false;
}
});
subscribers.add(subscriber);
});
public Observable<T> observe() {
return observable;
}
@Override
public boolean add(T t) {
return offer(t);
}
@Override
public boolean offer(T t) {
subscribers.forEach(subscriber -> subscriber.onNext(t));
return true;
}
@Override
public T remove() {
return noSuchElement();
}
@Override
public T poll() {
return null;
}
@Override
public T element() {
return noSuchElement();
}
private T noSuchElement() {
throw new NoSuchElementException();
}
@Override
public T peek() {
return null;
}
@Override
public void put(T t) throws InterruptedException {
offer(t);
}
@Override
public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
return offer(t);
}
@Override
public T take() throws InterruptedException {
throw new UnsupportedOperationException("Use observe() instead");
}
@Override
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public int remainingCapacity() {
return 0;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean containsAll(Collection<?> c) {
return false;
}
@Override
public boolean addAll(Collection<? extends T> c) {
c.forEach(this::offer);
return true;
}
@Override
public boolean removeAll(Collection<?> c) {
return false;
}
@Override
public boolean retainAll(Collection<?> c) {
return false;
}
@Override
public void clear() {
}
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
return true;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public Iterator<T> iterator() {
return Collections.emptyIterator();
}
@Override
public Object[] toArray() {
return new Object[0];
}
@Override
public <T> T[] toArray(T[] a) {
return a;
}
@Override
public int drainTo(Collection<? super T> c) {
return 0;
}
@Override
public int drainTo(Collection<? super T> c, int maxElements) {
return 0;
}
@Override
public void close() throws IOException {
subscribers.forEach(rx.Observer::onCompleted);
}
}
There are couple of interesting facts about it:
- We must keep track of all subscribers, i.e. consumers that are willing to receive new items. If one of the subscribers is no longer interested, we must remove such subscriber, otherwise memory leak will occur (keep reading!)
- This queue behaves as if it was always empty. It never holds any items - when you put something into this queue, it is automatically passed to subscribers and forgotten
- Technically this queue is unbounded (!), meaning you can put as many items as you want. However since items are passed to all subscribers (if any) and immediately discarded, this queue is actually always empty (see above)
- Still it's possible that producer is generating too many events and consumers can't keep up with that - RxJava now has back pressure support, not covered in this article.
Producer can use ObservableQueue<T> just like any other BlockingQueue<T>, assuming I implemented queue contract correctly. However the consumer looks much lighter and smarter:
final ObservableQueue<User> users = new ObservableQueue<>();
final Observable<User> observable = users.observe();
users.offer(new User("A"));
observable.subscribe(user -> log.info("User logged in: {}", user));
users.offer(new User("B"));
users.offer(new User("C"));
Code above prints "B"
and "C"
only. "A"
is lost by design since ObservableQueue
drops items in case no one is listening. Obviously Producer
class now uses users
queue. Everything works fine, you can call users.observe()
at any point and apply one of dozens of Observable
operators. However there is one caveat: by default RxJava doesn't enforce any threading, so consuming happens in the same thread as producing! We lost the most important feature of producer-consumer pattern, i.e. thread decoupling. Luckily everything is declarative in RxJava, thread scheduling as well:
users
.observe()
.observeOn(Schedulers.computation())
.forEach(user ->
log.info("User logged in: {}", user)
);
Now let's see some real RxJava power. Imagine you want to count how many users log in per second, where each login is placed as an event into a queue:
users
.observe()
.map(User::getName)
.filter(name -> !name.isEmpty())
.window(1, TimeUnit.SECONDS)
.flatMap(Observable::count)
.doOnCompleted(() -> log.info("System shuts down"))
.forEach(c -> log.info("Logins in last second: {}", c));
The performance is also acceptable, such queue can accept around 3 million objects per second on my laptop with one subscriber. Treat this class as an adapter from legacy systems using queues to modern reactive world. But wait! Using ObservableQueue<T>
is easy, but the implementation with subscribers
synchronized set seems too low-level. Luckily there is Subject<T, T>
. Subject
is "the other side" of Observable
- you can push events to Subject
but it still implements Observable
, so you can easily create arbitrary Observable
. Look how beautifully ObservableQueue
looks like with one of theSubject
implementations:
public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {
private final Subject<T, T> subject = PublishSubject.create();
public Observable<T> observe() {
return subject;
}
@Override
public boolean add(T t) {
return offer(t);
}
@Override
public boolean offer(T t) {
subject.onNext(t);
return true;
}
@Override
public void close() throws IOException {
subject.onCompleted();
}
@Override
public T remove() {
return noSuchElement();
}
@Override
public T poll() {
return null;
}
@Override
public T element() {
return noSuchElement();
}
private T noSuchElement() {
throw new NoSuchElementException();
}
@Override
public T peek() {
return null;
}
@Override
public void put(T t) throws InterruptedException {
offer(t);
}
@Override
public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
return offer(t);
}
@Override
public T take() throws InterruptedException {
throw new UnsupportedOperationException("Use observe() instead");
}
@Override
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public int remainingCapacity() {
return 0;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean containsAll(Collection<?> c) {
return false;
}
@Override
public boolean addAll(Collection<? extends T> c) {
c.forEach(this::offer);
return true;
}
@Override
public boolean removeAll(Collection<?> c) {
return false;
}
@Override
public boolean retainAll(Collection<?> c) {
return false;
}
@Override
public void clear() {
}
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
return true;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public Iterator<T> iterator() {
return Collections.emptyIterator();
}
@Override
public Object[] toArray() {
return new Object[0];
}
@Override
public <T> T[] toArray(T[] a) {
return a;
}
@Override
public int drainTo(Collection<? super T> c) {
return 0;
}
@Override
public int drainTo(Collection<? super T> c, int maxElements) {
return 0;
}
}
The implementation above is much cleaner and we don't have to worry about thread synchronization at all.
Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments