Working With Data in Microservices
In this post, I will focus on Ballerina’s support for processing and sharing data over the network.
Join the DZone community and get the full member experience.
Join For FreeA computer program is a set of instructions for manipulating data. Data will be stored (and transferred) in a machine-readable structured way that is easy to process by programs. Every year there are programming languages, frameworks, and technologies that emerge to optimize data processing in computer programs.
Without the proper support from languages or frameworks, the developer won’t be able to write their programs in a way that’s easy to process and get meaningful information out of the data. Languages such as Python and R have adapted to specialize in data processing jobs and Matlab and Octave specialize in complex numbers for numerical computing processing.
However, for microservice development where the programs are network distributed, traditional languages are yet to specialize for their unique needs. Ballerina is a new open-source programming language, which provides a unique developer experience to working with data in network-distributed programs.
I have discussed Ballerina’s support for network programming in the Rethinking Programming: The Network in the Language article. In this post, I will focus on Ballerina’s support for processing and sharing data over the network.
Network and Data-Friendly Type System
Ballerina is a statically typed programming language where types are checked at compile-time and only compatible values can be assigned. Ballerina’s type system is structural and works with an abstraction of a value called a shape. A shape ignores the storage identity of a value. The Rethinking Programming: Network-Aware Type System article written by Anjana Fernando, a director at WSO2, has discussed more insights of Ballerina’s type system.
In addition to the support of primitive data types (int, float, string, boolean, etc.), Ballerina has non-primitive data types, such as Arrays, Tuples, Maps, Union, Tables, XML, JSON, Table, Any and Anydata. Union types are the set of values that are the union of the value spaces of its component types.
For example, you can use a variable of a union type to store a string or an int, but there is only one type of value at any given time. Anydata is a unique type where Anydata type variables can be used in places where you expect pure values other than errors. The type Anydata is equivalent to the union of the following types:
Anydata = () | boolean | int | float | decimal | string | (anydata|error)[] | map<anydata|error> | xml | table
Since Ballerina is specifically designed to write programs that work well by sharing data over the networks, it natively supports JSON and XML.
The XML type in Ballerina represents a sequence of zero or more XML items. Each item can be an element, a text, a comment, or a processing instruction.
xxxxxxxxxx
xml bookXML = xml `<book>
<name>Sherlock Holmes</name>
<author>
<fname title="Sir">Arthur</fname>
<mname>Conan</mname>
<lname>Doyle</lname>
</author>
<bar:year xmlns:bar="http://ballerina.com/a">2019</bar:year>
<!--Price: $10-->
</book>`;
XML step expressions allow querying the children of an XML element or children of members of an XML sequence.
xxxxxxxxxx
// Select all children items, which are XML elements.
xml allElementChildren = bookXML/<*>;
// Select all children items of `bookXML`.
xml allChildren = bookXML/*;
// Match first-level children with element name `author`.
// Then, match the second level with element name `fname`
// and retrieve attributes of the `fname`
xml fnameTitle = bookXML/<author>/<fname>.title;
// Select all descendants, which match element name `fname`
xml fnameTitle = bookXML/**/<fname>
The following samples demonstrate XML support in Ballerina:
- XML Literal — Ballerina allows you to define XML as part of the language and syntactically validates it. You can insert expressions into the XML literal to pass values dynamically at runtime.
- XML Attributes — XML attribute access is now lax typed. This means that compile-time type checking is relaxed and moved to runtime.
- XML Namespaces — Ballerina has built-in support for defining and using XML namespaces.
- XML Functions — Ballerina supports various built-in functions to manipulate XML content.
As RESTful APIs use a simpler form of data exchange, JSON is becoming more popular over XML. JSON is a textual format for representing a collection of values: a simple value (string, number, “true”, “false”, “null”), an array of values, or an object.
Ballerina json type is designed for processing data expression in JSON format. It is a built-in name for a union defined as follows:
type json = () | boolean | int | float | decimal | string | json[] | map<json>
Like XML, the json type is also defined to have lax static typing.
xxxxxxxxxx
import ballerina/io;
json user = {
fname: "Lakmal",
lname: "Warusawithana",
address: {
line: "20 Palm Grove",
city: "Colombo 03",
country: "Sri Lanka"
}
};
public function main() {
io:println(user.address.city);
}
The following samples demonstrate JSON support in Ballerina:
- JSON — Basic JSON support
- JSON Objects — JSON objects are represented as maps of JSON in Ballerina.
- JSON Arrays — JSON array literals are written the same way as Ballerina arrays.
- JSON Access — Ballerina defines certain types as lax types for which static typing rules are less strict.
- JSON/Record/Map Conversion — Ballerina records, maps, and JSON objects are used to hold records. Records are collections of fields and each field value is accessed by a key. Converting from one type to another is very useful in certain situations.
- JSON to XML Conversion — JSON to XML conversions can be done using the xmlutils:fromJSON() function
Network Data Binding
Network data binding plays a smarter role where it can automatically handle structural validation, data types handling, and payload passthrough operations. Ballerina supports data binding techniques with different protocols such as HTTP, NATS, and RabbitMQ.
HTTP data binding helps access the payload through the last resource signature parameter. You can declare the parameter name in the resource config under the body annotation. Supported data types are string, JSON, xml, byte[], record, and record[].
The following code block shows how to bind a JSON payload.
xxxxxxxxxx
ResourceConfig { :
methods: ["POST"],
body: "orderDetails"
}
resource function bindJson(http:Caller caller, http:Request req,
json orderDetails) {
var details = orderDetails.Details;
http:Response res = new;
if (details is json) {
res.setPayload(<>details);
} else {
res.statusCode = 400;
res.setPayload("Order Details unavailable");
}
var result = caller->respond(res);
if (result is error) {
log:printError(result.reason(), result);
}
}
In the same way, you can bind different data types. The Data Binding Ballerina by Example demonstrates a complete example with JSON, XML and Record type data bindings.
Language-Integrated Query
Language-integrated query is a feature that allows you to use single syntax against multiple data sources. It will help to break down a complex problem into a series of short, comprehensible queries.
Ballerina query expressions provide a language-integrated query feature using SQL-like syntax. Unlike SQL statements, query expressions help to identify mistakes during design time because of type safety. A query expression consists of a sequence of clauses; from, let, where, and select.
The input to each clause is a sequence of frames and as each clause is executed, it iterates over its input frames and emits output frames. The frames emitted by one clause are the input to the next clause. The final clause, which is a select clause, emits output values where output values can be one of the following basic types:
- List — the constructed list has a member for each emitted value
- Table — the constructed table has a row for each emitted value
- String — the constructed string is the concatenation of the emitted values
- XML — the constructed XML value is the concatenation of the emitted values
The execution of a clause may complete early with an error value. In that case, this error value is the result of the query. Let's look at the following sample:
import ballerina/io;
type Student record {
string name;
int age;
string school;
};
type Result record {
string name;
string college;
float gpa;
string school;
};
public function main() {
map<float> gpa = {"John": 4.1, "Bill": 3.9, "Sam": 3.3, "Jennifer": 3.1};
Student s1 = {name: "John", age: 18, school: "Milpitas High School"};
Student s2 = {name: "Bill", age: 17, school: "San Jose High School"};
Student s3 = {name: "Sam", age: 18, school: "Clara Coutinho High School"};
Student s4 = {name: "Jennifer", age: 18, school: "Fremont Union High School"};
Student[] student = [];
student.push(s1);
student.push(s2);
student.push(s3);
student.push(s4);
Result[] stanford = from var candidate in student
let float cgpa = (gpa[candidate.name] ?: 0),
string targetCollege = "Stanford"
where cgpa > 3.8
select {
name: candidate.name,
college: targetCollege,
gpa: cgpa,
school: candidate.school
};
io:println(stanford);
}
xxxxxxxxxx
$ ballerina run query_expression.bal
Compiling source
query_expression.bal
Running executables
name=John college=Stanford GPA=4.1 school=Milpitas High School name=Bill college=Stanford GPA=3.9 school=San Jose High School
The from clause works similarly to a foreach statement. It creates an iterator from an iterable value and then binds variables to each value returned by the iterator. The let clause binds variables. The where clause provides a way to perform a conditional execution which can refer to variables bound by the from clause.
When the where condition evaluates to false, the iteration skips the following clauses. The select clause is evaluated for each iteration and the result of the query expression in this sample is a list whose members are the result of the select clause.
Like a query-expression, a query-action can be used with an iterable value:
xxxxxxxxxx
error? result = from var student in studentList
where student.score > 1.0
do {
FullName fullName = {
firstName: student.firstName,
lastName: student.lastName
};
nameList.push(fullName);
};
For each input frame emitted by the from clause (student), execute the do {} block with input frame in scope. If a clause completes early with an error, the result of the query action is an error. Otherwise, the result of the query-action is nil.
Pub/Sub and Streaming
While having a more flexible and agile development process in microservices, architecting a reliable and scalable microservice-driven data pipeline is challenging. Also, the ability to handle stateful messages, handling streaming data also requires some flexibility in the data types, structures and operations support in the language or the framework.
Working with data streaming can be categorized into three publish/subscribe (pub/sub), store or persist, and process in real-time based on their capabilities.
With pub/sub, producers broadcast data from a publishing service, enabling the stream to be available concurrently to multiple consumers. It provides decoupled communications where producers don’t know who the subscribers are, and consumers don’t know who the publishers are. Kafka and NATS are two popular distributed streaming platforms. NATS is good with small infrastructures and Kafka is more mature and performs very well with huge data streams.
Ballerina has out-of-the-box support for Kafka. See the following simple sample on how to send messages to a Kafka topic using a Kafka:Producer object:
xxxxxxxxxx
import ballerina/io;
import ballerina/kafka;
kafka:ProducerConfiguration producerConfiguration = {
// The `bootstrapServers` is the list of remote server endpoints of the
// Kafka brokers.
bootstrapServers: "localhost:9092",
clientId: "basic-producer",
acks: "all",
retryCount: 3,
// Uses the builtin string serializer for the values.
valueSerializerType: kafka:SER_STRING,
// Uses the builtin int serializer for the keys.
keySerializerType: kafka:SER_INT
};
kafka:Producer kafkaProducer = new (producerConfiguration);
public function main() {
string message = "Hello World, Ballerina";
var sendResult = kafkaProducer->send(message, "test-kafka-topic", key = 1);
if (sendResult is error) {
io:println("Error occurred while sending data: " + sendResult.toString());
} else {
io:println("Message sent successfully.");
}
var flushResult = kafkaProducer->flushRecords();
if (flushResult is error) {
io:println("Error occurred while flushing the data: " + flushResult.toString());
} else {
io:println("Records were flushed successfully.");
}
}
The Transactional Producer Ballerina by example shows how to send transactional messages to Kafka brokers atomically using the Kafka:Producer object.
The following Ballerina by Examples show different capabilities and use cases of the Kafka module as a consumer.
Like Kafka's support, Ballerina has comprehensive support for NATS. The following Ballerina by Examples illustrate how you can use the ballerina/nats module:
- NATS Streaming Client to Publish Data
- Streaming Publisher and Subscriber With Data Binding
- Durable Subscriptions
- Queue Groups
- Historical Message Replay
In real-world applications, we need to work with larger datasets. Ballerina inherently supports streaming large datasets. For example, retrieving a large amount of data, converting it to a preferred data format (e.g. JSON or XML) and sending it across the network on HTTP, does not require loading the entire dataset to memory at once. See the following code segment for an example of this:
xxxxxxxxxx
resource function getData(http:Caller caller, http:Request req) {
http:Response res = new;
var selectRet = testDB->select("SELECT * FROM Data", ());
if (selectRet is table<record {}>) {
json jsonConversionRet = jsonutils:fromTable(selectRet);
res.setPayload(<>jsonConversionRet);
} else {
res.statusCode = http:STATUS_INTERNAL_SERVER_ERROR;
res.setPayload({"Error": "Error occurred while retrieving data " +
"from the database"
});
}
In the above code segment, we invoke testDB->select method to obtain a large amount of data and convert it to JSON. But in this conversion, it will not load all the data into the memory. res.setPayload sets the payload to the response and stream to the client once the service is invoked. You can find the complete working code in streaming a big dataset sample.
We can use data streaming over different protocols such as HTTP and gRPC. Sample HTTP streaming shows HTTP input and output streaming capability through io:ReadableByteChannel. The following gRPC samples demonstrate Ballerina’s data streaming support in the gRPC module:
Stream Type
Ballerina has a built-in type called a stream, which represents a sequence that may be constructed lazily and is used to iterate over the sequence of values. We can define stream<T,C> if the values in the generated sequence all belong to T and if the completion value belongs to C. Successful completion of the sequence will return nil or an error T with error type E. A stream can be iterated over at most once and it supports two primitive operations: a next operation and a close operation.
xxxxxxxxxx
type Student record {
string firstName;
string lastName;
float score;
};
Student s1 = {firstName: "Alex", lastName: "George", score: 1.5};
Student s2 = {firstName: "Ranjan", lastName: "Fonseka", score: 0.9};
Student s3 = {firstName: "John", lastName: "David", score: 1.2};
Student[] studentList = [s1, s2, s3];
// Iterable types can be converted to a stream.
stream<Student> studentStream = studentList.toStream();
The stream type provides methods similar to lists such as map, foreach, filter, reduce, and iterator. The following code block shows how to use filter and map methods against the above studentStream:
xxxxxxxxxx
//The `filter` and `map` functions return streams and work lazily.
stream<Subscription> subscriptionStream = studentStream.filter(function (Student student) returns boolean {
return student.score > 1;
}).'map(function (Student student) returns Subscription {
Subscription subscription = {
firstName: student.firstName,
lastName: student.lastName,
score: student.score,
degree: "Bachelor of Medicine"
};
return subscription;
});
You can find the complete sample code in the streams BBE.
From version 1.2.0 onward Ballerina JDBC and MYSQL modules incorporated stream support in database operations. See the following code segment for an example of this:
xxxxxxxxxx
// The result is returned as a stream and the elements of the stream can
// be either a record or an error.
stream<record{}, error> resultStream =
jdbcClient->query("Select * from Customers");
// If there is any error during the execution of the SQL query or
// iteration of the result stream, the result stream will terminate and
// return the error.
error? e = resultStream.forEach(function(record {} result) {
io:println("Full Customer details: ", result);
io:println("Customer first name: ", result["FIRSTNAME"]);
io:println("Customer last name: ", result["LASTNAME"]);
});
// Check and handle the error during the SQL query
// or iteration of the result stream.
if (e is error) {
io:println("ForEach operation on the stream failed!");
io:println(e);
}
More complex samples can be found in the JDBC query with complex types.
Summary
Every year there are programming languages, frameworks, and technologies that emerge to optimize data processing in computer programs. However, for microservice development where the programs are network distributed, traditional languages are yet to specialize for their unique needs.
In this article, we looked at how Ballerina specifically provides a unique developer experience when working with data in network-distributed programs.
Opinions expressed by DZone contributors are their own.
Comments