Java Scale-First ExecutorService : Myth or Reality?
Concurrency can be tricky, particularly during peak demand times, so here's how to move your apps from queue-first to scale-first.
Join the DZone community and get the full member experience.
Join For Free
The capability of concurrent processing is no longer a nice-to-have quality, but rather a must-have one in today’s software applications. While almost all modern programming languages provide different semantics to achieve this capability, when it comes to Java, the JVM threads are mostly concerned with concurrency.
Therefore, Java provides many utilities to handle concurrency-related matters under its java.util.concurrent package. One of the most important utilities is an ExecutorService called ThreadPoolExecutor, which has the capability to accept asynchronous concurrent tasks and carry them out using a pool of threads that it maintains underneath. Most importantly, this ExecutorService takes care of the maintenance tasks of the thread pool such as spawning new threads and evicting idle ones, while developers only have to provide the parameters that control the size and scalability factors of the pool.
The three most important such parameters are the core pool size, maximum pool size, and the work queue. According to the Java documentation, the following are the definitions of those parameters.
corePoolSize: The number of threads to keep in the pool, even if they are idle.
maximumPoolSize: The maximum number of threads to allow in the pool.
workQueue: The queue to use for holding tasks before they are executed.
So let’s take an example scenario and see how the default Java thread pool executor will behave under a load of concurrent tasks.
If you are already aware of how the default thread pool executor works and you are here just to know whether scale-first is a myth or a reality, you can skip the next section and head onto the one after that.
How It Works
First of all, let’s configure an instance of Java’s ThreadPoolExecutor (which will be referred as Thread Pool here onwards) and observe how it behaves when a set of concurrent tasks are submitted. Assume that this thread pool is configured with a core size of 20, a max size of 100, a bounded work queue of size 50, and all the other parameters are kept to default values.
So when the thread pool is started, by default, it will start with no threads. When we submit the first task, the first worker thread will be spawned and the task will be handed over to that thread. As long as the current worker thread count is less than the configured core pool size, a new worker thread will be spawned for each newly submitted task, even though some of the previously created core threads may be in the idle state.
What will happen when the worker thread count reaches the core pool size? As soon as the number of worker threads reaches the core pool size (20 in our example), we can observe that the thread pool stops creating new worker threads. When we submit more and more tasks, one of the following behaviors can be observed.
Behavior 1
As long as there are any idle core threads (worker threads that have been created earlier but already finished their assigned tasks), they will take up the new tasks and execute them.
Behavior 2
If there are no idle core threads available, each new task will be enqueued into the defined work queue until a core thread becomes available to accept it.
In case the work queue becomes full — with not enough idle core threads to handle the tasks — then the thread pool resumes to create new worker threads and the new tasks will be carried out by them.
Once the worker thread count reaches the max pool size, the thread pool again stops creating new worker threads, and any task that is submitted after this point will be rejected.
In case you got lost in the above details, the following flow chart will be helpful:
Default behavior of Java ThreadPool ExecutorService
The Problem
So when we look at those behaviors, although the first behavior is completely acceptable, the second behavior is not. While it might be perfectly sensible for some use cases, this is not the ideal behavior for a system that could expect sudden peaks, yet has the ability to scale as soon as it hits the peak without waiting for the system to be stabilized. While one could possibly argue that there should be more core threads in that case, the number of threads in the system under the usual operation (non peak) should be a fair amount, due to obvious contention issues caused by higher thread counts.
We at Adroitlogic, also encountered this same problem when we were designing Project-X, which is the core framework of our integration product stack. Since all our integration products are performance critical, we needed to get rid of this queue-first behavior of the thread pool executor and make it scale-first. So after trying out several alternative approaches, we came across the following implementation, which brought the expected results.
The Solution
Before going into the implementation details, let’s see the expected behavior of a scale-first thread pool executor. A scale-first thread pool also behaves similarly to a typical thread pool until it reaches the core pool size. But once the core pool size is reached, it should first scale up the thread pool size by creating non-core threads instead of queuing the submitted tasks. When the max pool size is reached, then the queuing should be done.
Behavior of Scale-First ExecutorService
Understanding the Implementation of the Current Behavior
So as the first step, let’s go through Java’s ThreadPoolExecutor class to find out how this queue-first behavior is implemented. The following code block from its execute()
method looks interesting:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
The first if
block is responsible for creating a new core thread for the newly submitted task if the current worker thread count is less than the core pool size. Since we are fine with that behavior, let’s move to the next if block.
Simply put (ignoring some rechecks performed to preserve the consistency of the thread pool), the second if
block invokes the offer()
method of the work queue with the task to be executed.
By default, when the offer()
method of a BlockingQueue is invoked, it tries to enqueue the provided element (the task in this case) and returns true if successful. If the queue does not have enough space left to store the element, it will simply return false.
So in this case, if the queue has enough space, the task will be queued. If there are any idle worker threads available, they will be looking at this queue, and one of them will quickly grab this task.
Otherwise, the execution will be moved to the final else if
block. That code block will first try to create a new worker thread to handle the task and if it failed (due to max pool size limit is reached), it will simply reject the task.
Modifying the Behavior of the Offer() Method
So from what we saw here, we can understand that by modifying the default behavior of this offer()
method of the work queue, we can solve a major part of this issue. Conceptually, what we have to do is first check whether there are any idle worker threads in the pool. If there are any, we can try to enqueue the task so one of those threads can pick it up from there. Otherwise, we should return false from offer method, which will make the thread pool to create a new worker thread for the task.
The following two diagrams show the default behavior of the offer() method and our modified behavior.
Default behaviour of offer() method
Modified behavior of offer() method
So assuming that an AtomicInteger with the name currentIdleThreadCount
contains the number of current idle threads, our new offer()
method looks like below.
@Override
public boolean offer(Runnable e) {
return currentIdleThreadCount.get() > 0 && super.offer(e);
}
But unfortunately, there is no straightforward way to get the current idle worker thread count of the thread pool without introducing a performance bottleneck for the work queue. So now we need to implement a way to keep track of that, too.
Keeping Track of the Idle Worker Thread Count
To implement this, we basically need to identify a point where the status of a worker thread changes from idle to busy or vice versa. If we go back to the code of the ThreadPoolExecutor class, we can see that the getTask()
method performs exactly that. Each idle thread executes this method to acquire a new task to be executed, and the following code block plays an important part of that.
Runnable r = timed ?
workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :
workQueue.take();
Just to be clear, the timed
variable here basically indicates whether the thread pool is currently in a state where it can evict some of the idle threads. This can be true either if the thread pool has more worker threads than the core pool size or if the thread pool is configured to allow evicting idle core threads.
First of all, let’s consider the scenario when timed is false. In this case, the thread will call the take()
method of the work queue. So it is obvious that any thread that comes into the take()
method is currently idle and, therefore, we can override the implementation of this method in our work queue to increment the idleThreadCount
at the beginning. Then we can call the actual take()
method, which could result in one of the following two scenarios.
If there are no tasks in the queue, the thread will be blocked at this call until a task is available. So it is still in the idle state and our incremented counter value is correct.
If there is any task, then it will be returned by the method call. So now this thread is no longer idle and we can decrement our counter at this point.
So our overridden take()
method looks like below:
@Override
public Runnable take() throws InterruptedException {
currentIdleThreadCount.incrementAndGet();
Runnable take = super.take();
currentIdleThreadCount.decrementAndGet();
return take;
}
Then let’s consider the other scenario — where timed is true. In this case, the thread will call the poll(long timeout, TimeUnit unit)
method of the work queue with a timeout value. So here, it is also obvious that any thread that comes into the poll()
method is currently idle and, therefore, we can override the implementation of this method in our work queue to increment the idleThreadCount
at the beginning. Then we can call the actual poll()
method, which could result in one of the following two scenarios.
If there are no tasks in the queue, the thread will wait for this call for the provided timeout and then return null. By this time, the thread will be timed-out and will be soon evicted from the pool, reducing the number of idle threads by 1. So we can decrement our counter at this point.
If there is any task, it will be returned by the method call. So now this thread is no longer idle and we can decrement our counter at this point, too.
So our overridden poll(long timeout, TimeUnit unit)
method looks like below.
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
currentIdleThreadCount.incrementAndGet();
Runnable poll = super.poll(timeout, unit);
currentIdleThreadCount.decrementAndGet();
return poll;
}
Therefore with these new implementations of offer()
, take()
, and poll(long timeout, TimeUnit unit)
methods, now our thread pool will be scaled up when it has no idle worker threads to take up new tasks.
Introducing Queueing After Scaling
Now have we implemented our scale-first executor service? Unfortunately not. Our current implementation is a scale-only ExecutorService, which will only try to scale-up. Once it comes to the max pool size, it will reject the tasks instead of trying to queue them. So let’s fix that as well.
Fixing that is quite easy and simple. The thread pool executor provides the flexibility to configure a RejectedExecutionHandler, which will be called whenever a task is rejected from the thread pool. So we can implement a custom rejection handler, which will first try to put the rejected task back into the work queue. If the work queue cannot accept the task (i.e. the queue is full), then it will call the original rejection handler, which will either throw a RejectionException or handles the rejection according to user-defined logic.
class ReEnqueuePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.getQueue().add(r)) {
rejectedExecutionHandler.rejectedExecution(r, executor);
}
}
}
Please note that the rejectedExecutionHandler
variable holds the original rejection handler of the thread pool. In order to guarantee the correctness of this implementation, we need to override the add()
method of the work queue as below:
@Override
public boolean add(Runnable runnable) {
return super.offer(runnable);
}
Now the implementation is all done and dusted and we have a perfect and very much real scale-first executor service on top of Java’s ThreadPoolExecutor.
Published at DZone with permission of Udith Gunaratna. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments