File-Based Integration: Spring Integration vs Ballerina
In this article, explore file-based integration and look at Spring integration vs Ballerina.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
File-based integration is one of the most popular and widely used approaches in data integration. Many software systems work with files — i.e., they either take in files as input, give their results as files, or do both — and working with multiple disparate systems requires connecting those systems together. Newer software systems also provide API-based access to information, and API-based integration is one of the approaches in connecting such systems. Although APIs provide advantages over traditional file-based systems, such as complex data structures and fine-grained security, many legacy software systems, and even modern ones, often use file-based approaches to export and import data.
Integration requires ensuring that two separate systems understand each other. And, although file structures and data types will likely be different in different software systems, we need an integration mechanism to map, transform, filter, and cleanse data in order for software systems to understand each other. Some of the most popular file formats used in general scenarios are CSV, EDI, JSON, and XML.
Figure 1: A file integration scenario
As shown in Figure 1, the source application exports its information as files to be saved in the file system. The integration logic will connect to this file system, using protocols such as FTP/SFTP, and read in unprocessed files. Here, the file content will be analyzed, and after the required processing is done, it will output its result as files to the target application file system. The target application will be importing these newly created files.
Technologies for Integration
As mentioned earlier, the integration logic needed for our file processing scenario may contain operations such as filtering, enriching, and routing. You will note that these types of operations are applicable to other types of integration scenarios as well. So, if we are to implement similar integrations in multiple occasions, we will find ourselves implementing a lot of similar logic to do so. This leads to developers creating their own library, utility operations, or a framework to extract this common logic and re-use them. The most often re-usable integration logic becomes a set of patterns.
A well-known set of integration patterns have been formalized in the book “Enterprise Integration Patterns” by Gregor Hohpe and Bobby Woolf. This has become the de-facto standard in enterprise integration, and many programming frameworks have been implemented based on this.
In this article, we will use the following technologies to implement a file integration scenario and evaluate their features.
Spring Integration: This follows the enterprise integration patterns (EIPs) discussed in Hope and Woolf’s book and implements an integration solution on top of the Spring Framework, a feature-rich solution that has the full support of the Spring Framework.
Ballerina: Ballerina is a programming language for network-distributed applications. It contains first-class support for concepts such as services, network endpoints, and a network communications friendly type system.
Use Case: File Split FTP
Here, we will be re-using a Spring Integration sample for a file integration scenario, which can be found here. Spring Integration has an excellent collection of examples to start your own integration implementation.
Steps
Start reading all text files at “/tmp/in” directory.
Read in each file and split its contents to individual messages, which represents a line in the file, and file marker events, which identifies the start and the end of processing all the file data.
Route the messages so that data lines are sent to one channel, and the markers are sent to another channel.
The line data is sent to an integration flow through a channel, which checks the line header value to find out the file name and write the content to that file.
The marker data is sent to another integration flow through a channel, where the marker end message is filtered out, and a publish/subscribe endpoint is used to register multiple subscribers to write each file out to its own dedicated FTP location. Also, another subscriber is used to send an email to signal processing is done.
Implementation
The solution’s design and its code segments are discussed in relation to each technology reviewed.
Spring Integration
Spring Integration implements most of the concepts defined in EIPs. This is built on top of the Spring Framework on adding additional abstractions in fulfilling regularly used integration scenarios. Spring Integration follows a pipe and filter pattern, where each processing element in a flow is connected by a virtual pipe. These are known as message endpoints and message channels, respectively. This ensures there is loose coupling between individual message endpoints. The message endpoints implement operations such as routing, filtering, splitting, and aggregations.
There are many types of channels, where the main ones are direct channels and publish/subscribe channels. In direct channels, two message endpoints are directly connected, and in pub/sub-channels, one message endpoint can send messages to a channel, which can be received by many messaging endpoints.
Figure 3: Pipes/filters pattern
Spring Integration provides an XML configuration-based approach and also a Java DSL-based approach on a fluent builder pattern. In our implementation, we will be using the Java DSL-based approach.
We will look at some of the main code snippets used with Spring Integration in implementing this scenario. An integration scenario is represented using an “IntegrationFlow” interface in Spring Integration. One of the ways of creating this would be to use a builder pattern starting with the “IntegrationFlows” class. Here, you can use this to create an input channel adapter in order to get messages into the system and connect them to a message channel. Each message that is created and put through the message channels are represented using the “Message” interface. This will contain a domain-specific “payload” value and a set of headers. For example, a file inbound adapter will have “File” objects as payload values.
Refer below to view how the files from the source directory is read with a certain pattern. The text content in the files are split to separate lines, and also, with the file line messages, it will also be emitting file marker messages that show the start and the end of processing.
public IntegrationFlow fromFile() {
return IntegrationFlows.from(
Files.inboundAdapter(new File("/tmp/in"))
.preventDuplicates(false)
.patternFilter("*.txt"), e -> e.poller(Pollers.fixedDelay(5000)
.errorChannel("tfrErrors.input"))
.id("fileInboundChannelAdapter"))
.handle(Files.splitter(true, true))
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(FileSplitter.FileMarker.class, "markers.input")
.channelMapping(String.class, "lines.input"))
.get();
}
Listing 1: Spring Integration — Reading file data
In line 03 of Listing 1, we signal to the framework that we are reading files from the directory “/tmp/in”, and also we are configuring it to filter only files that have the “.txt” extension. Then, the actual file reading is done in a polling fashion with a five-second polling interval. Thereafter, the “Files.splitter” creates a message endpoint, which splits the messages into string lines, and emit messages with string payloads, and also create messages with “FileMarker” objects as payloads. From there, another “route” endpoint is used to perform some routing based on the message type we are sending to it. If it receives a “String” value, it sends this to the channel named “lines.input”, or else, if it is a marker object of type “FileMarker”, it sends it to the “markers.input” channel.
For each bean representing a message endpoint, you can reach its implicit input channel by simply appending “.input” to the bean name. This is how the “markers” and “lines” message endpoints’ input channels are named “markers.input” and “lines.input”, respectively.
In the next step, let’s create the FileWritingMessageHandler, which is basically an output channel adapter, to write messages to files.
xxxxxxxxxx
public FileWritingMessageHandler fileOut() {
return Files.outboundAdapter("'/tmp/out'")
.appendNewLine(true)
.fileNameExpression("payload.substring(1, 4) + '.txt'")
.get();
}
Listing 2: Spring Integration — Defining the output file channel adapter
Here, we simply define a file channel adapter, which outputs data lines by deriving the file name by extracting the first few characters in the data line. The following shows the actual message endpoint used to write out the data lines.
xxxxxxxxxx
public IntegrationFlow lines(FileWritingMessageHandler fileOut) {
return f -> f.handle(fileOut);
}
Listing 3: Spring Integration — The integration flow to write out data lines to files
In the following integration flow, we wait for the file mark end message, and flush the files that were written data to. Then, each of the files are picked up, and they are sent to their own FTP location. And, finally, an email is sent to notify the processing has been done.
xxxxxxxxxx
public IntegrationFlow markers() {
return f -> f.<FileSplitter.FileMarker>filter(m ->
m.getMark().equals(FileSplitter.FileMarker.Mark.END),
e -> e.id("markerFilter"))
.publishSubscribeChannel(s -> s
// first trigger file flushes
.subscribe(sf -> sf.transform("'/tmp/out/.*\\.txt'",
e -> e.id("toTriggerPattern"))
.trigger("fileOut", e -> e.id("flusher")))
// send the first file
.subscribe(sf -> sf.<FileSplitter.FileMarker,
File>transform(p -> new File("/tmp/out/002.txt"))
.enrichHeaders(h -> h.header(
FileHeaders.FILENAME, "002.txt", true))
.handle(Ftp.outboundAdapter(ftp1())
.remoteDirectory("foo"), e -> e.id("ftp002")))
// send the second file
.subscribe(sf -> sf.<FileSplitter.FileMarker,
File>transform(p -> new File("/tmp/out/006.txt"))
.enrichHeaders(h -> h.header(FileHeaders.FILENAME,
"006.txt", true))
.handle(Ftp.outboundAdapter(ftp2()).remoteDirectory("foo"),
e -> e.id("ftp006")))
// send the third file
.subscribe(sf -> sf.<FileSplitter.FileMarker, File>transform(
p -> new File("/tmp/out/009.txt"))
.enrichHeaders(h -> h.header(FileHeaders.FILENAME,
"009.txt", true))
.handle(Ftp.outboundAdapter(ftp3()).remoteDirectory("foo"),
e -> e.id("ftp009")))
// send an email
.subscribe(sf -> sf.transform(
FileSplitter.FileMarker::getFilePath)
.enrichHeaders(Mail.headers()
.subject("File successfully split and transferred")
.from("foo@bar")
.toFunction(m -> new String[] { "bar@baz" }))
.enrichHeaders(h -> h.header(EMAIL_SUCCESS_SUFFIX, ".success"))
.channel("toMail.input")));
}
Listing 4: Spring Integration — Flush files, write to FTP, and send email
In the above integration flow, the message source is the channel that retrieves file marker messages. We simply filter out the messages to only consider marker end messages and process the output files from there. At this point, a pub/sub message channel is created to take in the marker message and send it out to multiple target channels, which will have their own integration flows. These subscribe flows are used to send a file to an FTP location. One of the flows sends out an email to signal completion. The flow that sends the email flow and its initialization of the mail output adapter is shown below.
xxxxxxxxxx
private MailProperties mailProperties;
public IntegrationFlow toMail() {
return f -> f
.handle(Mail.outboundAdapter(this.mailProperties.getHost())
.port(this.mailProperties.getPort())
.credentials(this.mailProperties.getUsername(),
this.mailProperties.getPassword()),
e -> e.id("mailOut").advice(afterMailAdvice()));
}
Listing 5: Spring Integration — Email sending integration flow
Listing 5 shows the configuration of the mail output adapter, and how it can be used to send a mail.
Solution Analysis
The scenario implementation from Spring Integration is done using the Java DSL, but this can also be done using XML-based configuration. Developers who prefer a code-first approach will be more inclined to select the Java DSL-based approach. This provides a good balance between rolling up your own integration code, and a full graphical-only based integration solution. Spring Integration provides a rich set of functionality to implement most of the EIPs; the Spring Framework also provides additional features, such as Spring Data and Spring Security.
Also, whenever required, we can drill in and create custom integration logic, with a custom message handler (filter/message endpoint), and plug it into an integration flow construct. This gives the flexibility of having custom logic, while retaining the uniformity of following a single framework.
Ballerina
Ballerina follows a pure code-first approach for integration. The programming language maps the requirements of modern application developments and addresses key aspects such as being network-aware and having resilient communication features. These aspects are built into the language, as language features alongside the standard library, and this makes Ballerina an out-of-the-box solution for performing integration in an intuitive manner.
Let’s see how we can implement our sample use case using Ballerina. The source code for the below implementation can be found here.
Ballerina has a general services concept that is used to represent any incoming messages to a system. Here, we are defining a file listener and a service that binds to this listener, which can be used to monitor a local file system folder.
xxxxxxxxxx
listener file:Listener filein = new ({
path: "/tmp/in"
});
service file_reader on filein {
resource function onCreate(file:FileEvent fe) returns error? {
if (fe.name.endsWith(".txt")) {
io:println("File: ", fe.name);
check processFileData(<@untainted> fe);
check ftpFiles();
check sendDoneEmail();
}
}
}
Listing 6: Ballerina Integration — Defining a file listener service
The above setup takes care of listening for file changes in a directory and passing the file information to be processed by the “processFileData”function. Afterwards, the resultant files are FTPed using the “ftpFiles” function, and, finally, a status email is sent using the “sendDoneEmail” function.
Let’s take a look at how the “processFileData” function is implemented.
xxxxxxxxxx
function processFileData(file:FileEvent fe) returns error? {
io:ReadableByteChannel bch = check io:openReadableFile("/tmp/in/" + fe.name);
io:ReadableCharacterChannel cch = new(bch, "UTF-8");
io:ReadableTextRecordChannel rch = new(cch, rs = "\n", fs = "*");
while (rch.hasNext()) {
string[] line = check rch.getNext();
check writeFileLine("/tmp/out/" + line[0], line);
}
}
function writeFileLine(string path, string[] line) returns error? {
io:WritableByteChannel bch = check io:openWritableFile(path);
io:WritableDataChannel dch = new(bch);
string linex = "".'join(...line);
check dch.writeString(linex, "UTF-8");
}
Listing 7: Ballerina Integration — Input file processing
Here, file data is read from the source represented by the earlier “FileEvent” instance. This file data is split into a record format, where, in each record, the first field is used as the output file name. Each line is sent to its respective file, where this output file location is used as a buffer area, where the resultant data will be written later to a remote FTP location.
In the next section, we will see how the FTP client endpoint is set up, followed by the logic used in writing our buffered data out to the final targets.
xxxxxxxxxx
ftp:ClientEndpointConfig ftpConfig = {
protocol: ftp:FTP,
host: config:getAsString("ftp_host"),
port: 20,
secureSocket: {
basicAuth: {
username: config:getAsString("ftp_user"),
password: config:getAsString("ftp_pass")
}
}
};
ftp:Client ftpClient = new(ftpConfig);
function ftpFiles() returns error? {
check ftpFile("/tmp/out/1.txt", "/data/1.txt");
check ftpFile("/tmp/out/2.txt", "/data/2.txt");
check ftpFile("/tmp/out/3.txt", "/data/3.txt");
}
function ftpFile(string src, string tgt) returns error? {
io:ReadableByteChannel bch = check io:openReadableFile(src);
check ftpClient.append(tgt, bch);
}
Listing 08: Ballerina Integration — FTP configuration
Here, we have defined our FTP target as a Ballerina network client. For this, the configuration is first given using an instance of “ftp:ClientEndpointConfig”, which contains the target FTP server-specific information. The Ballerina config API has been used to externalize the properties that are used for this. They can be passed in many ways, including an external configuration file or using environment variables. The “ftpFile” function implements the actual logic to write the source file contents to a target FTP location, where the buffered file content is simply appended to the target location.
The final step is to send a notification mail stating that a set of files were processed by our integration solution.
xxxxxxxxxx
gmail:GmailConfiguration gmailConfig = {
clientConfig: {
auth: {
scheme: http:OAUTH2,
config: {
grantType: http:DIRECT_TOKEN,
config: {
accessToken: config:getAsString("gmail_at"),
refreshConfig: {
refreshUrl: gmail:REFRESH_URL,
refreshToken: config:getAsString("gmail_rt"),
clientId: config:getAsString("gmail_clientid"),
clientSecret: config:getAsString("gmail_clientsecret")
}
}
}
}
}
};
gmail:Client gmailClient = new(gmailConfig);
function sendDoneEmail() returns error? {
gmail:MessageRequest messageRequest = {};
messageRequest.recipient = "admin@acme.com";
messageRequest.sender = "foo@acme.com";
messageRequest.subject = "File Integration Executed";
messageRequest.messageBody = "File Integration Done with FTP transfers";
messageRequest.contentType = gmail:TEXT_PLAIN;
check gmailClient->sendMessage("foo", messageRequest);
}
Listing 9: Ballerina Integration - Email notification
Here, we are using the Gmail connector to send out emails for the notification. The user has to provide the credentials to authenticate herself for sending mail. This is done by creating a Gmail network client with an authentication configuration. Afterward, using the “sendDoneEmail”, an email message is generated and sent out.
A notable design decision in Ballerina is the use of a sequence diagram model in organizing the code. Its networking constructs and operations are represented as actors and messages passed between them. Also, the concurrency model fits naturally into this model as an intuitive way of visualizing concurrent executions. Code written in Ballerina can be automatically visualized as a sequence diagram without any manual definitions.
Figures 4 and 5 show the graphical versions of the code in Listings 6 and 9, respectively.
Figure 4: Ballerina Integration — Defining a file listener service as a graphical view
Figure 5: Ballerina Integration — An email notification as a graphical view
Solution Analysis
Ballerina’s solution follows a more procedural pattern, compared with Spring Integration’s more declarative mode with its Java DSL. However, Ballerina’s services concept and its network endpoints make the file and other network operations feel more natural, and getting up to speed with its functionality feels more intuitive. Although Ballerina does not directly implement EIPs in the way Spring Integration does, its language features mean that this is not necessarily required to be productive in implementing integration logic.
The graphical view with the sequence diagram provides the best of both worlds in graphical modeling and code-first programming for integration. It is not meant to be a graphical-first mode for defining your logic, but rather an approach to self-document your code, and have a high-level view of the project that is developed. Ballerina’s principle is that integration and application code has become one — there is no distinction anymore. Any developer will now perform integrations with his or her application code, and integrating with other systems is becoming the actual business logic.
Also, Ballerina’s approach being more closer to the business logic gives developers the flexibility to make tweaks when required. This is not directly possible with a standard framework and curtails our freedom to some extent.
Conclusion
Both approaches have their own merits; however, if you already have a Spring Framework-based application, you will be compelled to use Spring Integration owing to the large ecosystem and in-built functionality, with strict conformance to EIPs.
Ballerina provides a more novel approach by converging business logic and integration. The language tells us that there is no such thing as writing separate integration code: it has already become a natural part of a developer’s day-to-day coding. Ballerina succeeds in helping to define those operations more intuitively.
Further Reading
Opinions expressed by DZone contributors are their own.
Comments