OO, Functional, Imperative, and Reactive: All Weaved Together
Learn more about how you can combine OO, function, imperative, reactive programming concepts.
Join the DZone community and get the full member experience.
Join For FreeThis is the first of a two part series discussing how different paradigms in programming can be weaved together seamlessly via the "first-class procedure," a term I'm using to best describe the concept.
The working code in this article demonstrates how you can seamlessly weave together the following to service a request:
- Validate a request (on socket event loop thread).
- Start a transaction and register the request in the database. This will be on another thread to avoid halting the socket event loop thread.
- Make reactive calls to pull in data from other services.
- Run some functional code to work out the standard deviation on service times.
- Undertake alternate flows to handle special cases (including handling exceptions). Then, if no exceptions causing rollback, store results in the database, this, again, is on a different thread to not tie up the reactive event loop thread.
- Send the response after committing the transaction
This allows you to use the programming paradigm best suited to the various problems at hand. Note that the request servicing in the demonstration is arbitrary. The focus is on showing how the various programming paradigms can be weaved together.
Now, to write a complete description of the first class procedure is beyond a single article. There are many patterns used together to enable the composition through first-class procedures. Therefore, I'm going to provide an introduction to first-class procedures in two parts:
- This article with demonstrate, with working code, how flexible and easy composition is with first-class procedures
- The next article will provide an explanation more closely aligned to the theory on how the first-class procedure has evolved to its current understanding
We'll start with some simple examples and then get to the more interesting weaving together of first-class procedures.
First-Class Procedure
Simple Event Loop
The following first class procedure services a REST request. This will be run on the HTTP socket event loop thread.
public void service(ObjectResponse<ServicedThreadResponse> response) {
response.send(new ServicedThreadResponse(
Thread.currentThread().getName(), "Event", System.currentTimeMillis()));
}
Simple Thread-Per-Request
The following first class procedure services a REST request by pulling a value from the database and sending it in the response. This will be run by a separate thread pool.
public void service(
ServicedThreadRequest request,
ThreadPerRequestRepository repository,
ObjectResponse<ServicedThreadResponse> response) {
int identifier = request.getIdentifier() % 10;
ThreadPerRequest entity = repository.findById(identifier).get();
response.send(new ServicedThreadResponse(
Thread.currentThread().getName(), entity.getName(), System.currentTimeMillis()));
}
The distinction of thread will be discussed later. However, for now, notice that a Spring Repository is used by only the thread-per-request first class procedure.
First-Class Procedures Weaved Together
OK, the above is little boring. We've seen this in web application servers before, now show us something interesting!
To show something more interesting, we are going to weave first-class procedures together to achieve the example detailed at the start of this article.
Each step in the request servicing is implemented as a first-class procedure. We'll address each first-class procedure in the order specified.
Validate Request (on Socket Event Loop)
This is simple validation that the request is correct. As it is straight forward logic, we use the thread of the socket event loop. This way, we don't have to pay overheads of a thread context switch and threading overheads to reject invalid requests. The code is as follows:
const HttpException = Java.type("net.officefloor.server.http.HttpException");
const Integer = Java.type("java.lang.Integer")
function validate(identifier, requestIdentifier) {
if (Number(identifier) <= 0) {
throw new HttpException(422, "Invalid identifier");
}
requestIdentifier.set(Integer.valueOf(identifier))
}
validate.officefloor = [
{ httpPathParameter: "identifier" },
{ out: Integer },
{ next : "valid" }
];
Note that the validation is written in JavaScript. This is so that the client-side JavaScript validation rules can be re-used to validate requests to ensure consistency between client and server.
Theofficefloor
attribute added to the function provides meta-data. This is necessary, as JavaScript does not provide the strongly typed information required of first class procedures.
Imperative to Register Request in the Database
After validation, the request identifier is registered in the database. This also creates a unique number for the request based on an IDENTITY column in the database.
@Next("registered")
public static void registerRequest(
@Val int requestIdentifier,
WeavedRequestRepository repository,
Out<WeavedRequest> weavedRequest) {
WeavedRequest entity = new WeavedRequest(requestIdentifier);
repository.save(entity);
weavedRequest.set(entity);
}
Reactive
The next is some Reactive code to concurrently call the two REST end points detailed at the start of this article (simple event loop and simple thread-per-request). Because we are using Reactive, we can call them concurrently to improve performance.
Note that while waiting on the responses, the flow is effectively idle with threads servicing other functionality. This is asynchronous handling so that threads are not tied up waiting. Once both sets of results come back, they notify the respective asynchronous flow to continue processing.
By now you may be noticing the Out
/@Val
combinations. This is how values can be passed from one first class procedure to another first class procedure. Note that if type for different values is the same, a qualifier can be used to distinguish them. The rest of the arguments are provided from dependency injection (in this case Spring).
private final static String URL = "http://localhost:7878/{path}";
@Next("useData")
public static void retrieveData(
WebClient client,
AsynchronousFlow eventLoopFlow,
@EventLoopResponse Out<ServicedThreadResponse> eventLoopResponse,
@Val WeavedRequest request,
AsynchronousFlow threadPerRequestFlow,
@ThreadPerRequestResponse Out<ServicedThreadResponse> threadPerRequestResponse) {
Flux.range(1, 10)
.map((index) ->
client.get().uri(URL, "event-loop")
.retrieve().bodyToMono(ServicedThreadResponse.class))
.flatMap((response) -> response)
.collectList()
.subscribe((responses) -> eventLoopFlow.complete(
() -> eventLoopResponse.set(
responses.stream().toArray(ServicedThreadResponse[]::new))));
Flux.range(1, 10)
.map((index) ->
client.post().uri(URL, "thread-per-request")
.contentType(MediaType.APPLICATION_JSON)
.syncBody(new ServicedThreadRequest(request.getId()))
.retrieve().bodyToMono(ServicedThreadResponse.class))
.flatMap((response) -> response)
.collectList()
.subscribe((responses) -> threadPerRequestFlow.complete(
() -> threadPerRequestResponse.set(
responses.stream().toArray(ServicedThreadResponse[]::new))));
}
Functional
Next, the reactive responses are provided to Scala functional code to determine the standard deviation of service times.
def mean(timestamps: Iterable[Long]): Double =
timestamps.sum.toDouble / timestamps.size
def variance(timestamps: Iterable[Long]): Double = {
val avg = mean(timestamps)
timestamps.map(timestamp => math.pow(timestamp.toDouble - avg, 2)).sum / timestamps.size
}
def stdDev(timestamps: Iterable[Long]): Double = math.sqrt(variance(timestamps))
@Next("use")
def standardDeviation(
@EventLoopResponse @Val eventLoopResponses: Array[ServicedThreadResponse],
@ThreadPerRequestResponse @Val threadPerRequestResponses: Array[ServicedThreadResponse]
): Double =
stdDev(
(eventLoopResponses ++ threadPerRequestResponses)
.map(response => response.getTimestamp))
Note that a library could be used to reduce this code. However, we've done this to demonstrate how functional code can be integrated into first-class procedures.
Flow Control
The next first class procedure triggers a flow to handle special cases. Should there be no issues with the special cases, then it stores the standard deviation in the database.
@FlowInterface
public static interface Flows {
void handleSpecialCases(FlowSuccessful callback);
void stored();
}
public static void store(
@Parameter double standardDeviation,
Flows flows,
@Val WeavedRequest request,
WeavedRequestRepository repository,
Out<RequestStandardDeviation> stDevOut) {
flows.handleSpecialCases(() -> {
request.setRequestStandardDeviation(
new RequestStandardDeviation(standardDeviation, request));
repository.save(request);
stDevOut.set(request.getRequestStandardDeviation());
flows.stored();
});
}
The handling of the special cases is by the following first-class procedure.
public static void handleSpecialCase(
@Val WeavedRequest request
) throws WeavedRollbackException, WeavedCommitException {
switch (request.getRequestIdentifier()) {
case 3:
throw new WeavedRollbackException(request);
case 4:
throw new WeavedCommitException(request);
}
}
Touch of Exception Handling
The two exception handling first-class procedures are as follows:
public static void handle(
@Parameter WeavedRollbackException exception,
ObjectResponse<WeavedErrorResponse> response) {
WeavedRequest request = exception.getWeavedRequest();
response.send(new WeavedErrorResponse(request.getRequestIdentifier(), request.getId()));
}
public static void handle(
@Parameter WeavedCommitException exception,
WeavedRequestRepository repository,
ObjectResponse<WeavedErrorResponse> response) {
WeavedRequest request = exception.getWeavedRequest();
request.setWeavedError(new WeavedError(
"Request Identifier (" + request.getRequestIdentifier() + ") is special case",
request));
repository.save(request);
response.send(
new WeavedErrorResponse(request.getRequestIdentifier(), request.getId()));
}
The second handler works within the transaction, so it includes further data stored in the database.
Note that due to first-class procedure composition not requiring the caller to catch exceptions, checked exceptions are embraced. We consider checked exceptions very useful information in flow composition. However, the distinction is that it should not be the caller's concern but rather the flow's concern. To me, this is a big difference and stops the catch and log exception handling problem. Exception handling is now a separate concern that can be coded in afterwards.
Successful Response
On successful storage of the request details in the database, the following first class procedure sends the response.
public void send(
@Val WeavedRequest request,
@Val RequestStandardDeviation standardDeviation,
@EventLoopResponse @Val ServicedThreadResponse[] eventLoopResponse,
@ThreadPerRequestResponse @Val ServicedThreadResponse[] threadPerRequestResponse,
ObjectResponse<WeavedResponse> response) {
response.send(new WeavedResponse(
request.getRequestIdentifier(),
request.getId(),
eventLoopResponse,
threadPerRequestResponse,
standardDeviation.getStandardDeviation()));
}
Kotlin for Some OO
Oh, and just for a little bit more polyglot fun, the OO objects used to represent the JSON request/responses are the following.
@HttpObject
data class ServicedThreadRequest(
val identifier: Int)
data class ServicedThreadResponse(
val threadName: String,
val lookupName: String,
val timestamp: Long)
data class WeavedErrorResponse(
val requestIdentifier: Int,
val requestNumber: Int)
data class WeavedResponse(
val requestIdentifier: Int,
val requestNumber: Int,
val eventLoopResponses: Array,
val threadPerRequestResponses: Array,
val standardDeviation: Double)
Proving it Works
The following is a test to confirm the flow of first-class procedures that service the request.
public static final SpringRule spring = new SpringRule();
public static final OfficeFloorRule officeFloor = new OfficeFloorRule();
@ClassRule
public static final RuleChain ordered = RuleChain.outerRule(spring).around(officeFloor);
@Rule
public final HttpClientRule client = new HttpClientRule();
private static final ObjectMapper mapper = new ObjectMapper();
static {
mapper.registerModule(new KotlinModule());
}
@Test
public void confirmWeavedTogether() throws Exception {
HttpResponse response = this.client.execute(
new HttpPost(this.client.url("/weave/1")));
assertEquals("Should be successful", 200, response.getStatusLine().getStatusCode());
WeavedResponse body = mapper.readValue(
EntityUtils.toString(response.getEntity()), WeavedResponse.class);
WeavedRequest entity = spring.getBean(WeavedRequestRepository.class)
.findById(body.getRequestNumber()).get();
assertNotNull("Should have standard deviation stored", entity.getRequestStandardDeviation());
}
Weaving it all Together
The following diagram is the configuration to weave the above first-class procedures together.
This is the only configuration/code necessary to compose the first-class procedures together. Notice the names that represent the first-class procedure names and their respective meta-data.
What this means is we need to check the port on the all the calls and tests. Yes, everything you see above is running off the one port. You don't have to choose between a framework that provides only thread-per-request or single threaded event loops. This is because of the execution strategy provided by thread injection of first-class procedures.
Thread Injection
The threading configuration is actually the following:
<teams>
<team source="net.officefloor.frame.impl.spi.team.ExecutorCachedTeamSource"
type="org.springframework.data.repository.CrudRepository" />
</teams>
Here, we flag all procedures requiring a Spring Repository to be executed by a thread pool. Remember, I said to keep note of use of Spring Repository. Well, the above configuration has any first-class procedure requiring a Spring Repository executed by the configured thread pool. Note that thread pools are named teams, due to the modeling origins of first class procedures coming from Offices.
Therefore, looking at the flow again, the thread execution is as follows:
- Validate uses the thread of the socket listener event loop
- Register request uses a Spring Repository, so execution is swapped to a thread from the configured thread pool
- This thread carries onto trigger the asynchronous reactive calls
- The reactive event loop thread then invokes the callbacks. As the Scala code is quick to execute, the reactive event loop thread carries on to execute the Scala pure function. Here, it is deemed that a thread context switch is too much overhead, and it is more efficient to just invoke the highly optimized Scala pure function. However, if we want to separate the Scala function to different thread pool, we can configure in a different thread pool (typically via marker dependency on the first class procedure).
- The remaining imperative code has a switch back to a thread from the configured thread pool, as depends on Spring repository. Furthermore, the thread locals between the threads are propagated to each used thread, so the Spring Repository transaction is not lost (i.e. transaction is active for all first class procedures within the transaction bounds).
- Response is then sent.
Now, all the above is configurable via Thread Injection. If we have, for example, more than one synchronous data store, we can create a thread pool to interact with each data store to avoid one slow data store tying up all threads of the application.
This also means you can configure different threading for different environments without having to change any code.
Disclaimer
In a real world applications, I would try to avoid so many of the above programming languages together. I'd try to streamline them to just a couple to avoid too many skill sets involved driving up maintenance costs of your application (plus reduces problems for mixed compiling). This is only a demonstration of how OO, functional, imperative, and reactive code can all be weaved together with first-class procedures. Furthermore, it demonstrates how you can write concrete solutions before abstracting.
Also, as you can see, we've had to cover a lot of breadth in each programming paradigm. If the code is not a good representation of the paradigm, we're very happy to take feedback on improvements from those more acquainted with a particular paradigm.
And if we've missed an important paradigm, please let me know so we can consider including it. When it comes to coding, we appreciate diversity to give developers choice. We're trying to tear down fences between the paradigms to have one big happy coding family.
Summary
We've demonstrated how the first-class procedure can weave together polyglot code written in different paradigms to service a request. The code outlined above in the article is all the code required for the application. There is no further weaving code required.
Furthermore, to avoid the problems of it only works on my machine (in this article), the code for the above is available here. See the read me on how to run it.
For more understanding of what's going on, see the tutorials, my other articles.
And stay tuned for my next article on using the first-class procedure.
Published at DZone with permission of Daniel Sagenschneider. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments