Java Concurrency: Condition
In this article, learn how to make threads wait on specific conditions by using the Condition interface.
Join the DZone community and get the full member experience.
Join For FreePreviously we checked on ReentRantLock
and its fairness. One of the things we can stumble upon is the creation of a Condition
. By using Condition
, we can create mechanisms that allow threads to wait for specific conditions to be met before proceeding with their execution.
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
The closest we came to that so far is the wait Object Monitor method.
A Condition
is bound to a Lock
and a thread cannot interact with a Condition
and its methods if it does not have a hold on that Lock
.
Also, Condition
uses the underlying lock
mechanisms. For example, signal
and signalAll
will use the underlying queue of the threads that are maintained by the Lock
, and will notify them to wake up.
One of the obvious things to implement using Conditions
is a BlockingQueue
. Worker threads process data and publisher threads dispatch data. Data are published on a queue, worker threads will process data from the queue, and then they should wait if there is no data in the queue.
For a worker thread, if the condition is met the flow is the following:
- Acquire the lock
- Check the condition
- Process data
- Release the lock
If the condition is not met, the flow would slightly change to this:
- Acquire the lock
- Check the condition
- Wait until the condition is met
- Re-acquire the lock
- Process data
- Release the lock
The publisher thread, whenever it adds a message, should notify the threads waiting on the condition.
The workflow would be like this:
- Acquire the lock
- Publish data
- Notify the workers
- Release the lock
Obviously, this functionality already exists through the BlockingQueue
interface and the LinkedBlockingDeque
and ArrayBlockingQueue
implementations.
We will proceed with an implementation for the sake of the example.
Let’s see the message queue:
package com.gkatzioura.concurrency.lock.condition;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MessageQueue<T> {
private Queue<T> queue = new LinkedList<>();
private Lock lock = new ReentrantLock();
private Condition hasMessages = lock.newCondition();
public void publish(T message) {
lock.lock();
try {
queue.offer(message);
hasMessages.signal();
} finally {
lock.unlock();
}
}
public T receive() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
hasMessages.await();
}
return queue.poll();
} finally {
lock.unlock();
}
}
}
Now let’s put it into action:
MessageQueue<String> messageQueue = new MessageQueue<>();
@Test
void testPublish() throws InterruptedException {
Thread publisher = new Thread(() -> {
for (int i = 0; i < 10; i++) {
String message = "Sending message num: " + i;
log.info("Sending [{}]", message);
messageQueue.publish(message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread worker1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
String message = messageQueue.receive();
log.info("Received: [{}]", message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread worker2 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
String message = messageQueue.receive();
log.info("Received: [{}]", message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
publisher.start();
worker1.start();
worker2.start();
publisher.join();
worker1.join();
worker2.join();
}
That’s it! Our workers processed the expected messages and waited when the queue was empty.
Published at DZone with permission of Emmanouil Gkatziouras, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments