Distributed Java Queues on Top of Redis
Join the DZone community and get the full member experience.
Join For FreeUsing Queues in Redis
Redis is a powerful tool that supports many different types of data structures from Strings and lists to maps and streams. Developers use Redis as a database, a cache, and a message broker.
Like any message broker, Redis needs to send messages in the correct order. Messages may be sent according to their age or according to some other predefined priority ranking.
In order to store these pending messages, Redis developers need a queue data structure. Redisson is a framework for distributed programming with Redis and Java that provides implementations of many distributed data structures, including queues.
Redisson makes Redis development easier by providing a Java API. Instead of requiring developers to learn Redis commands, Redisson includes all the well-known Java interfaces, such as Queue and BlockingQueue. Redisson also handles the tedious behind-the-scenes work in Redis, such as connection management, failover handling, and data serialization.
You may also like: Building Microservices With Redis.
Redis-Based Distributed Java Queues
Redisson provides multiple Redis based implementations of the basic queue data structure in Java, each with a different functionality. This allows you to select the type of queue that is best suited for your purposes.
Below, we'll discuss six different types of Redis based distributed queues using the Redisson Java framework.
Queue
The RQueue
object in Redisson implements the java.util.Queue interface. Queues are used for situations in which elements should be processed beginning with the oldest ones first (also known as "first in, first out" or FIFO).
As with plain Java, the first element of the RQueue
can be examined with the peek()
method, or examined and removed with the poll()
method:
xxxxxxxxxx
RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
BlockingQueue
The RBlockingQueue
object in Redisson implements the java.util.BlockingQueue interface.
BlockingQueues are queues that block a thread attempting to poll from an empty queue, or attempting to insert an element in a queue that is full. The thread is blocked until another thread inserts an element into the empty queue, or polls the first element from the full queue.
The example code below demonstrates the proper instantiation and use of an RBlockingQueue
. In particular, you can call the poll()
method with arguments that specify how long the thread will wait for an element to become available:
xxxxxxxxxx
RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
During times of failover or reconnection to the Redis server, the poll()
, pollFromAny()
, pollLastAndOfferFirstTo()
, and take()
Java methods are resubscribed automatically.
BoundedBlockingQueue
The RBoundedBlockingQueue
object in Redisson implements a bounded blocking queue structure. Bounded blocking queues are blocking queues whose capacity has been bounded, i.e. limited.
The code below demonstrates how to instantiate and use an RBoundedBlockingQueue
in Redisson. The trySetCapacity()
method is used to attempt to set the capacity of the blocking queue. trySetCapacity()
returns the Boolean value "true" or "false", depending on whether the capacity was successfully set or whether it was already set:
xxxxxxxxxx
RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
queue.trySetCapacity(2);
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// will be blocked until free space available in queue
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
DelayedQueue
The RDelayedQueue
object in Redisson allows you to implement a delayed queue in Redis. This could be useful when delivering messages to consumers using a strategy such as exponential backoff. After each failed attempt to deliver a message, the time between retries will increase exponentially.
Each element in the delayed queue will be transferred to a destination queue after a delay that is specified together with the element. This destination queue may be any queue that implements the RQueue
interface, such as an RBlockingQueue
or RBoundedBlockingQueue
.
xxxxxxxxxx
RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
// move object to destinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to destinationQueue in 1 minute
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
It's a good idea to destroy the delayed queue by using the destroy() method after the queue is no longer needed. However, this is not necessary if you are shutting down Redisson.
PriorityQueue
The RPriorityQueue
object in Redisson implements the java.util.Queue interface. Priority queues are queues that are sorted not by the age of the element, but by the priority that is associated with each element.
As shown in the example code below, RPriorityQueue
uses a Comparator to sort the elements in the queue:
xxxxxxxxxx
RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // set object comparator
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.poll();
PriorityBlockingQueue
The RPriorityBlockingQueue
object in Redisson combines the functionalities of RPriorityQueue
and RBlockingQueue
. Like RPriorityQueue
, RPriorityBlockingQueue
uses a Comparator
to sort the elements in the queue.
xxxxxxxxxx
RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // set object comparator
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.take();
During times of failover or reconnection to the Redis server, the poll()
, pollLastAndOfferFirstTo()
, and take()
Java methods are resubscribed automatically.
Further Reading
Opinions expressed by DZone contributors are their own.
Comments