Lightweight Parallel Tasks in Java Microservices
BascomTask lightweight in-process task orchestration helps reign in the complexities involved with processing data from multiple sources.
Join the DZone community and get the full member experience.
Join For FreeThe Importance of Lightweight Tasks
One of the corollaries of embracing microservices is the increased need to aggregate information from multiple remote sources. In turn, exploiting every opportunity to execute such operations in parallel becomes more important and managing the resulting complexities becomes more central to the organization of your code. There are several core Java language and library options available to achieve this, but their generality makes them relatively low-level for this purpose: they lack a task abstraction.
A task in this context is a class that performs a non-trivial, track-worthy unit of work in an execution graph, potentially comprised of many such tasks. Tasks are important because interacting with an external system or database often involves details that are best encapsulated in a separate class. Done well, this separation of concerns facilitates refactoring at a logical level without impedance from the particular complexities involved with any external interaction. Such refactoring is to be expected as business goals evolve – which is one of the reasons for embracing microservices in the first place.
The need, however, extends beyond just class structure. While any Java code can create and call task classes, cross-cutting functionality such as tracing and profiling are best injected automatically rather than requiring explicit programmer coding. Representing non-trivial units of work, tasks should be parallelizable across multiple threads where possible while maintaining proper ordering so that tasks supplying values are executed prior to tasks consuming those values, preferably without requiring the latter to block-wait. The mechanism for wiring tasks together (which tasks to execute, how to execute them, and what dependencies they have) becomes a central focus. How this is accomplished, for better or worse, involves several considerations:
- How is parallelism among tasks specified, preferably with as little manual programmer effort as possible? It is not always so easy to choose the optimal path, and manually-expressed optimization decisions may have to be revisited after every refactoring.
- The required idioms should require minimal syntax elaboration with correspondingly minimal opportunities for introducing programming errors, all the more critical because timing errors or hidden inefficiencies among parallel threads can be non-deterministic, difficult to reproduce, and difficult to trace.
- Composing the actual task classes should not require compromising core language capabilities such as type safety. One does not use Java only to have to make compromises on behalf of libraries or frameworks that dictate things you cannot do without extra effort or not at all.
- The solution should allow that simple cases remain simple while being scalable for endpoints which can grow to 10s or 100s of tasks. One does not want to have to rethink your application code design just because you're adding more tasks.
Using Standard Library Features
The simplest approach might seem to be directly managing threads pulled from a pool, but it is then difficult to manage ordering dependencies, and moving data among spawned threads and propagating exceptions takes careful programming. CompletableFuture
s deal with these and other problems much more effectively by abstracting away much of the mechanics of thread management along with a very rich capability to chain transformations together using fluent operations. Most of these methods have a sync and async variant, e.g. thenApply()
vs. thenApplyAsync()
, enabling parallelism at each step by choosing the appropriate method variation at each step. There is no direct task support, but also no restrictions on calling out to any kind of task class.
On the downside, the CompletableFuture API is on the complex side, particularly when it comes to aggregating inputs for task-level methods. A particular quirk that shows up relatively more often in this context is that any call requiring more than two inputs must be wired differently than calls with one or two inputs. The difference can be seen in the example of collecting the outputs of two vs. three elements in a list. Two inputs can be handled one way:
x
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> task.compute(1));
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> task.compute(2));
List<Integer> list = a.thenCombine(b, List::of).join();
While three inputs require a different approach, such as using allOf()
that will only complete when all of its inputs complete. Unfortunately, this also requires that those inputs (a, b, and c, in this case) be repeated separately and redundantly from their eventual use, increasing the possibility of programming error:
x
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> task.compute(1));
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> task.compute(2));
CompletableFuture<Integer> c = CompletableFuture.supplyAsync(() -> task.compute(3));
CompletableFuture.allOf(a, b, c).thenAccept(v->System.out.println(List.of(a,b,c)));
Moreover, allOf()
does not return any useful value. While it is straightforward to work with any processed data items within the nested block, exposing that value outside the nested block requires several transformational steps, such as the following:
x
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> task.compute(1));
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> task.compute(2));
CompletableFuture<Integer> c = CompletableFuture.supplyAsync(() -> task.compute(3));
List<Integer> list = CompletableFuture.allOf(a, b, c)
.thenApply(v ->
List.of(a, b, c).stream().map(CompletableFuture::join)
.collect(Collectors.toList())).join();
Much like lambdas in general, CompletableFuture
s are quite elegant from a Java perspective, but also quite verbose from a purely functional perspective. The CompletableFuture
class has on the order of 50 execution and chaining methods with various subtleties that take some effort to master and use effectively at scale.
BascomTask
BascomTask is a lightweight task orchestration library that provides thread-level parallelization in a way that is as frictionless as possible. This means that, by design, using BascomTask is very close to pure Java code in both usage and performance, including use of CompletableFuture
s where they can be used effectively, but without having to rely solely on them for task-level organization of a codebase. BascomTask aims to compliment rather than replace CompletableFuture
s and freely integrates with them.
Bringing BascomTask into your code, or removing it, is correspondingly a minor amount of work. BascomTask is not specifically tied to microservice design and in fact brings in a very minimal amount of library dependencies, but its particular feature set provides great help with many of the common challenges of building microservices.
Its primary mechanics are evident in wiring a single task
method:
xxxxxxxxxx
Orchestrator $ = Orchestrator.create();
TaskImpl task = new TaskImpl();
CompletableFuture<Integer> a = $.task(task).compute(1);
int v = a.get();
The syntax for invoking a task
method in BascomTask is the Java method invocation itself, which in this example is the call to compute taking a single int
as an argument. That is just an example method name – there are no pre-defined method overrides in BascomTask, so any method with any signature will work.
The difference from pure Java is that prior to making the task
method call, a task
is first passed through a $.task()
call (the use of $
as a Java variable is simply a convention for readability, the variable name could instead be orchestrator
or orc
or anything else). As a result, the compute
task method is not actually invoked at line 3. Instead, a Java DynamicProxy wrapper is returned from $.task()
that intercepts the the subsequent compute
call and instead creates an internal dependency graph linking its inputs to its arguments without actually executing the compute
method. A placeholder, CompletableFuture
, is returned that that does not have a value assigned (its CompletableFuture.isDone()
method will return false). The actual execution of the compute
method occurs as a result of retrieving the value of that CompletableFuture
at line 4.
The bookkeeping at line 3 that links task methods to their arguments is all internal. Programmers can simply include or exclude the $.task()
call in line 3 to get the same functional result, with the only difference being the point at which the actual target task method is invoked (if at all) and (see later discussion) whether it would be allowed to execute in a different thread.
The call to get()
internally invokes Orchestrator.activate()
which is the more general way to schedule task
method execution, including allowing multiple task
methods to be activated at the same time. The inputs can be supplied as varargs or if a list of uniform items then a list of uniform results returned. The BascomTask alternative to the allOf()
example from the previous section might appear like this, using a variant of activate
that waits until its results are complete:
Orchestrator $ = Orchestrator.create();
TaskImpl task = new TaskImpl();
CompletableFuture<Integer> a = $.task(task).compute(1);
CompletableFuture<Integer> b = $.task(task).compute(2);
CompletableFuture<Integer> c = $.task(task).compute(3);
List<Integer> list = $.activateAndWait(List.of(a,b,c));
POJO task
class definitions require only that a task
class has an interface which extends TaskInterface
. That is simply a marker interface, with a handful of default utility methods, that has no required overrides:
IComputeTask extends TaskInterface<Task> {
CompletableFuture<Integer> compute(int x);
}
class ComputeTask implements IComputeTask {
public CompletableFuture<Integer> compute(int x) {
// Just return the value, wrapped in a CompletableFuture
return CompletableFuture.complete(x);
}
}
The notable thing in these examples is the lack of any sort of special-purpose dependency specification separate from the method signatures. It's just Java calling methods on a POJO task's interface. The method signatures themselves are used as the dependency specification.
The naturalness of this approach becomes more apparent as the execution graph becomes more complex with outputs of tasks feeding into other tasks. In this next example, a second task method, inc
, is invoked (BascomTask does not restrict the number of task
methods on any task
class) that increments the value from the first:
x
Orchestrator $ = Orchestrator.create();
TaskImpl task = new TaskImpl();
CompletableFuture<Integer> a = $.task(task).compute(1);
CompletableFuture<Integer> b = $.task(task).inc(a);
int v = b.get();
On this foundation comes the biggest benefit of all: automatic and efficient thread-based parallelism without any programmer intervention. BascomTask automatically identifies parallelization opportunities and spawns new threads to do so, but in a conservative manner that optimizes thread usage. In the preceding example there is no point in spinning up new threads, so BascomTask executes both tasks in the calling thread. In the example below, in contrast, a task method, add
, takes two arguments that each can be run in separate threads. BascomTask recognizes this opportunity and will automatically spawn a thread to execute one of the tasks while executing the other in the calling thread. When both are complete, the add
task method is invoked in whichever of those threads finishes last (because then both inputs will be available):
xxxxxxxxxx
Orchestrator $ = Orchestrator.create();
TaskImpl task = new TaskImpl();
CompletableFuture<Integer> a1 = $.task(task).compute(1);
CompletableFuture<Integer> a2 = $.task(task).compute(2);
CompletableFuture<Integer> added = $.task(task).add(a1,a2);
int v = added.get();
As in the previous example, execution is not started until the get()
call at line 5. However, execution does not begin right away because its arguments have not completed. Activation of a task, whether implicitly through get()
or explicitly through any variant of activate()
, schedules a task
method for execution as soon as all its CompletableFuture
inputs (if any) have completed. Activation of a task
method is also propagated to its inputs, recursively, so the method will execute, as soon as possible but not necessarily right away.
This lazy evaluation can be a great aid for complex graphs because it in effect separates the specification of task
method dependencies from the decision to execute them. In a service (or any similar program) where there are multiple points where task output is consumed, this means you can specify dependencies independently without having to worry about doing task work that may not subsequently be needed. BascomTask allows you to build up a full dependency graph in one place with methods like $.task()
, and elsewhere simply access only those elements that are actually needed. BascomTask will ensure that task
methods are executed once, if needed, with a minimum of thread allocations (leveraging the calling thread where possible).
This is illustrated in the example execution graph below. Each oval represents a task
method with outgoing dependencies among them (e.g., task
method 6 takes the output of task
methods 2 and 3 as arguments).
When a get()
is made on the CompletableFuture
returned from task method A, that task method is activated (scheduled for execution). BascomTask then works backwards to activate any of its CompletableFuture
inputs, recursively, eventually finding task
methods 2, 3, and 4 that have no inputs and can therefore be executed immediately. Since there is more than one, they can be run in parallel so two additional threads (blue and orange) are spawned (pulled from an executor pool) while the green thread keeps (non-deterministically) one of the tasks for itself. Once all of those roots are started, execution flows forward. As each task
method is completed, its thread checks whether the resulting CompletableFuture
supplies the final argument to one of its downstream task
methods and if so executes that task
method, or else terminates. The red thread, for example, provides the final argument for task method 7 (assuming task method 4 completed first), so it executes task method 7 but once that is completed the thread terminates because task A does not yet have all of its arguments. Only the green calling thread (in this example) supplies the final argument for task method A (assuming red 7 completed first), so it executes that task method and its value is returned as a result of the get()
call.
While automated parallelism is the default, there are several ways to fine-tune its behavior. A simple @light
annotation added to any task
method indicates that that method does little work so BascomTask should never bother spin up a separate thread to execute it. That behavior can also be forced at the wiring level by calling light()
on a task after adding it to an orchestrator, e.g.
xxxxxxxxxx
CompletableFuture<Integer> added = $.task(task).light().add(a1,a2);
Similarly a runSpawned()
method is available to force spawning if desired. BascomTask also provides various options such as forcing all or no task spawning on an orchestrator or for all orchestrators. Several variations of activate()
are also available to indicate whether the calling thread should wait or not, or for collecting the results in an asynchronous callback all at once or individually as soon as their results become available.
Simplifying Conditional Wiring Logic
Conditional logic can quickly make task wiring complex. A common use case for service endpoints is with optional parameters that limit or expand the default result scope, which, in turn, has implications for limiting or expanding the set of tasks that are invoked. The wiring implications can be tricky because adding or removing a task with an if-then-else statement may have implications on downstream tasks that depend on that task. Consider the following non-BascomTask example with some form of boolean input having been set in bv
:
x
CompletableFuture<X> cfx = ...;
CompletableFuture<Y> cfy = bv ? task.computeY(...) : null;
// later...
CompletablueFuture<Y> r = bv ? task.compute(cfx,cfy) : task.compute(cfx);
r.get();
What's problematic with the above is that bv
must be repeated both at the point where we decide on whether to execute the task as well as the point(s) where the task output is used. We could also check whether cfy
is null but the effect is the same: having to precisely correlate conditional decision points across several points in the code else bad things will happen. This correlation requirement becomes unwieldy and error prone as you add tasks and conditions.
BascomTask can improve the situation because it separates specification from execution, and the specification part need not need account for whether or not a task is executed at all. In the BascomTask version of the above, bv
need only be evaluated once, further simplified by leveraging a built-in BascomTask version of the Java ternary operator:
x
CompletableFuture<X> cfx = ...;
CompletableFuture<Y> cfy = task.computeY(...);
// later...
CompletablueFuture<Y> r = $.cond(bv, task.compute(cfx,cfy), task.compute(cfx));
r.get();
In this example, the call to get()
in the last line activates the built-in $.cond()
task method, which activates bv
(whose definition is not shown above), which, when complete, activates one of its arguments (one of the compute
calls), which, in turn, activates its arguments, and so on. The computeY
task method will never be invoked if bv
evaluates as false.
From a performance standpoint, you might not always want to delay the computation behind cfx
or cfy
until after bv
completes. A variant of of the $.cond()
is also available with additional boolean arguments to indicate if either or both of its arguments should be executed proactively. It's up to programmer choice because one may or may not want to proactively execute task alternatives when their result may not be needed. To achieve that level of control, two optional boolean parameters can be specified in the $.cond()
call:
x
CompletableFuture<X> r = $.cond(bv,cfx,false,cfy,true);
Because in this example the final argument is passed as true
, when $.cond()
is activated then cfy
will be activated at the same time as bv
, though cfy
will only be reflected in the result if bv
evaluates to true.
Additional Features
BascomTask provides additional features that are very useful in microservice development, including:
- Creating simple tasks from lambdas without requiring class definitions.
- Targeted exception handling to provide compensating actions in response to failures.
- Dynamically adding new tasks anywhere including from within nested tasks.
- Customizing task start/end actions globally or separately for each Orchestrator.
- Using built-ins for tracing, execution profiling, etc.
The Conductor Pattern
For many service implementations, embedding task wiring directly into service processing logic is sufficient. Some implementations, on the other hand, are spread out across multiple components that need to access task results at different times and in different places. This leads to the challenge of "at most once but only once." Tasks often represent relatively expensive units of work, so it is worthwhile to avoid computing task results if not needed, but for these more complicated implementations it is not known up front exactly which tasks those will be. Once a result has been computed, it is wasteful to compute it again if that result is needed at a separate point in the code.
Passing around multiple task results separately solves the problem but is cumbersome and impedes code evolution. Aggregating results in a holder class solves that problem but leaves undefined the responsibility of where and when the wiring occurs. The Conductor Pattern is simply the suggestion that task wiring be encapsulated in a class with the sole responsibility of managing task wiring. Its consumers only see the results:
A Conductor may compute its results in aggregate, it may compute them incrementally on demand, or some combination of both. Those decisions are best hidden from consumers. Much like the role a conductor plays in an orchestra, a Conductor in this pattern directs when and where to compute task results. Its consumers (the audience) simply enjoy the results at their leisure.
This pattern is agnostic as to BascomTask or any particular orchestration mechanism, but BascomTask is particularly well suited for this purpose because of its lazy activation mechanism. With BascomTask, the Conductor implementation can create all or much the dependency graph at startup and selectively expose shared results through getter methods. The Conductor deals with the "only once" problem, and BascomTask with the "at most once" problem because it inherently knows not to activate task methods until needed.
As an example, here is a rewrite of the earlier BascomTask add
task example embedded in a Conductor class:
xxxxxxxxxx
public class Conductor {
private Orchestrator $ = Orchestrator.create();
private final CompletableFuture<Integer> a1;
private final CompletableFuture<Integer> added;
public Conductor(int input) {
TaskImpl task = new TaskImpl();
a1 = $.task(task).compute(1);
CompletableFuture<Integer> a2 = $.task(task).compute(2);
added = $.task(task).add(a1,a2);
}
public CompletableFuture<Integer> getA1() {
return $.activate(a1);
}
public CompletableFuture<Integer> getAdded() {
return $.activate(added);
}
}
The wiring is done in the constructor in this example though it might also be broken out into separate wiring methods. The constructor takes an integer input, which for service implementations might come from a query parameter or equivalent (if that parameter was needed elsewhere it could also be stored and exposed with a getter, making a Conductor a useful place to propagate such inputs). Otherwise, only the values that are intended to be shared need be stored and exposed with getters, including CompletableFuture
results from task methods. In this example, a2
is part of the wiring but need not be exposed externally so it is not saved.
When exposed through getters, CompletableFuture
s can be returned directly or after having been activated. The latter choice is typical, as in the example above, because it allows callers to apply chaining methods such as thenApply()
without themselves having to deal with activation or even being exposed to or aware of BascomTask at all.
Summary
The predominant mechanism for managing asynchronous Java threads is CompletableFuture
s, which have extensive support for parallelization and chaining, but are not ideal when used as the primary structuring mechanism to manage execution at the task level. BascomTask compliments CompletableFuture
s by adding a task focus, simplifying the wiring and management of tasks and providing unique task-focused features.
BascomTask provides a low-friction way to exploit parallel task execution that is a common need for microservices. It is end-to-end asynchronous, which is important for high-volume services without requiring blocking threads at any level. It is lightweight, highly performant, feature-rich, and takes a minor effort to add to or remove from your code.
Complex service implementations may benefit from the Conductor Pattern in conjunction with BascomTask. Together, they provide highly efficient and effective resolution of the "at most once but only once" problem.
Opinions expressed by DZone contributors are their own.
Comments