Implementing the Outbox Pattern
Learn how!
Join the DZone community and get the full member experience.
Join For FreeYou may also like: Design Patterns for Microservices
The Problem Statement
Microservices often publish events after performing a database transaction. Writing to the database and publishing an event are two different transactions and they have to be atomic. A failure to publish an event can mean critical failure to the business process.
To explain the problem statement better, let’s consider a Student microservice that helps Enroll the student. After enrollment, the "Course Catalog" service, emails the student all the available courses. Assuming an Event-Driven application, the Student microservice enrolls the student by inserting a record in the database and publishes an event stating that the enrollment for the student is complete. The "Course Catalog" service listens to this event and performs its actions. In a failure scenario, if the Student microservice goes down after inserting a record, the system would be left in an inconsistent state.
The OutBox Pattern
This pattern provides an effective solution to publish events reliably. The idea of this approach is to have an “Outbox” table in the service’s database. When receiving a request for enrollment, not only an insert into the Student table is done, but a record representing the event is also inserted into the Outbox table. The two database actions are done as part of the same transaction.
An asynchronous process monitors the Outbox table for new entries and if there are any, it publishes the events to the Event Bus. The pattern merely splits the two transactions over different services, increasing reliability.
A description of this pattern can be found on Chris Richardson’s excellent microservices.io site. As described on the site there are two approaches to implementing the Outbox pattern (Transaction log tailing and Polling publisher). We will be using the log tailing approach in the solution below.
Transaction log tailing can be implemented in a very elegant and efficient way using Change Data Capture (CDC) with Debezium and Kafka-Connect.
Outbox Pattern With Kafka Connect
Solution Design
The Student microservice exposes endpoints to perform database operations on the domain. The microservice uses a Postgres database, which houses two tables “Student” and “Outbox”. The transactional operations, modify/insert into the “Student” table and adds a record in the “Outbox” table.
The Kafka-Connect framework runs as a separate service besides the Kafka broker. The Debezium connector for Postgres is deployed on the Kafka-Connect runtime, to capture the changes on the database. In our example, a custom connector is also deployed within Kafka-Connect to help identify the right Kafka topics for an event.
The Debezium connector tails the database transaction logs (write-ahead log) from the ‘‘Outbox’’ table and publishes an event to the topics defined by the custom connector.
This solution guarantees at-least-once delivery, since Kafka Connect services ensure that each connector is always running; however, there is a chance the solution can publish the same event multiple times between connectors going down and starting up. To ensure exactly-once delivery, the consuming client must be Idempotent, making sure the duplicate events aren’t processed again.
Understanding the Code
You can find the code here. I would encourage you to read through the story – since I have walked through some key implementation details and the limitations of this pattern.
Student Microservice
This is a simple Spring-Boot microservice, which exposes three endpoints via the REST controller and uses Spring-JPA for database actions. The endpoints exposed are a GET for fetching student information, POST for creating or enrolling a student and a PUT for updating the student email address. The POST and the PUT generate events ‘Student Enrolled’ and ‘Student Email Changed’. The change to invoke the Database actions and inserting the event is handled in the Service class.
@Transactional
public StudentDTO enrollStudent(EnrollStudentDTO student)
throws Exception {
log.info("Enroll Student details for StudentId: {}",
student.getName());
StudentEntity studentEntity = StudentMapper.
INSTANCE.studentDTOToEntity(student);
studentRepository.save(studentEntity);
//Publish the event
event.fire(EventUtils.createEnrollEvent(studentEntity));
return StudentMapper.INSTANCE.studentEntityToDTO(studentEntity);
}
...
public static OutboxEvent createEnrollEvent(StudentEntity studentEntity)
{
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.convertValue(studentEntity, JsonNode.class);
return new OutboxEvent(
studentEntity.getStudentId(),
"STUDENT_ENROLLED",
jsonNode
);
}
The method needs the Transactional
annotation so that database action and the event write is bound by a single transaction. The enrollStudent()
creates a new record on the Student table and then fires an event using Spring’s ApplicationEventPublisherAware
support. The method createEnrollEvent()
, helps build the data to be inserted into the OutBox. Inserting the event into the ‘Outbox’ table is handled in the EventService
class which uses a Spring-JPA Repository to handle the database interactions.
@EventListener
public void handleOutboxEvent(OutboxEvent event) {
UUID uuid = UUID.randomUUID();
OutBoxEntity entity = new OutBoxEntity(
uuid,
event.getAggregateId(),
event.getEventType(),
event.getPayload().toString(),
new Date()
);
log.info("Handling event : {}.", entity);
outBoxRepository.save(entity);
/*
* Delete the event once written, so that the outbox doesn't grow.
* The CDC eventing polls the database log entry and not the table in the database.
*/
outBoxRepository.delete(entity);
}
A key thing to note here is the code deletes the record on the ‘Outbox’ Table once it has been written so that the outbox table doesn’t grow. Also, Debezium doesn’t examine the actual contents of the database table, but instead it tails the write-ahead transaction log. The calls to save()
and delete()
will make a CREATE
and a DELETE
entry in the log, once the transaction commits. The Kafka-Connect custom transformer can be programmed not to perform any action on the DELETE
entry.
Custom Debezium Transformer
This component determines the Kafka topic to which the event needs to be published. This is done by using the EVENT_TYPE column of the payload from the ‘Outbox’ table. The component is built as a JAR and will be placed in the Kafka-Connect runtime. The setup of placing the JAR in the Kafka-Connect runtime is handled by the DockerFile.
FROM debezium/connect
ENV DEBEZIUM_DIR=$KAFKA_CONNECT_PLUGINS_DIR/debezium-transformer
RUN mkdir $DEBEZIUM_DIR
COPY target/custom-debezium-transformer-0.0.1.jar $DEBEZIUM_DIR
We use the image debezium/connect
, since it comes preloaded with all available connectors. For installing a particular connector you can refer to the documentation here. The component consists of just one class that helps determine the topic before the message is published.
public class CustomTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
/**
* This method is invoked when a change is made on the outbox schema.
*
* @param sourceRecord
* @return
*/
public R apply(R sourceRecord) {
Struct kStruct = (Struct) sourceRecord.value();
String databaseOperation = kStruct.getString("op");
//Handle only the Create's
if ("c".equalsIgnoreCase(databaseOperation)) {
// Get the details.
Struct after = (Struct) kStruct.get("after");
String UUID = after.getString("uuid");
String payload = after.getString("payload");
String eventType = after.getString("event_type").toLowerCase();
String topic = eventType.toLowerCase();
Headers headers = sourceRecord.headers();
headers.addString("eventId", UUID);
// Build the event to be published.
sourceRecord = sourceRecord.newRecord(topic, null, Schema.STRING_SCHEMA, UUID,
null, payload, sourceRecord.timestamp(), headers);
}
return sourceRecord;
}
The transformer extends the Kafka-Connect Transformation class. The apply()
method, filters the CREATE
operation (‘c’) skipping the DELETE
, as explained above.
For every CREATE
the topic name is identified and the payload is returned. For simplicity in this example, the topic name is the lowercase value of the EVENT_TYPE column, inserted into the “Outbox” table by the Student Microservice.
Installation of the Needed Images and Frameworks
The guide assumes the user has docker pre-installed. You can follow the steps for installation here. Creating the Debezium Connect Image is done by triggering a maven build on the custom-debezium-connect project and building the docker image.
mvn clean install
docker build -t custom-debezium-connect .
Running the Docker Compose under the project folder installs all the pre-requisites: Zookeeper, Kafka, Postgres, and Kafka-Connect. The Docker Compose file:
version: "3.5"
services:
# Install postgres and setup the student database.
postgres:
container_name: postgres
image: debezium/postgres
ports:
- 5432:5432
environment:
- POSTGRES_DB=studentdb
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
# Install zookeeper.
zookeeper:
container_name: zookeeper
image: zookeeper
ports:
- 2181:2181
# Install kafka and create needed topics.
kafka:
container_name: kafka
image: confluentinc/cp-kafka
hostname: kafka
ports:
- 9092:9092
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://kafka:29092
LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "student_email_changed:1:1,student_enrolled:1:1"
depends_on:
- zookeeper
# Install debezium-connect.
debezium-connect:
container_name: custom-debezium-connect
image: custom-debezium-connect
hostname: debezium-connect
ports:
- '8083:8083'
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium_connect_config
OFFSET_STORAGE_TOPIC: debezium_connect_offsets
STATUS_STORAGE_TOPIC: debezium_connect_status
BOOTSTRAP_SERVERS: kafka:29092
depends_on:
- kafka
- postgres
We use the image debezium/postgres
, because it comes prebuilt with the logical decoding feature. This is a mechanism that allows the extraction of the changes, that were committed to the transaction log making the CDC possible. The documentation for installing the plugin to Postgres can be found here.
Setting Up the Kafka Topics
Execute the below commands to create the two Kafka topics: “student_enrolled” and “student_email_changed”
docker exec -t kafka /usr/bin/kafka-topics \
--create --bootstrap-server :9092 \
--topic student_email_changed \
--partitions 1 \
--replication-factor 1
docker exec -t kafka /usr/bin/kafka-topics \
--create --bootstrap-server :9092 \
--topic student_enrolled \
--partitions 1 \
--replication-factor 1
Linking the Debezium Kafka Connect With the Outbox Table
Execute the below curl command to create a connector in the Kafka-Connect server. This connector points to the Postgres installation and also specifies the table and the custom transformer class we built earlier.
curl -X POST \
http://localhost:8083/connectors/ \
-H 'content-type: application/json' \
-d '{
"name": "student-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "studentdb",
"database.server.name": "pg-outbox-server",
"tombstones.on.delete": "false",
"table.whitelist": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "com.sohan.transform.CustomTransformation"
}
}'
That completes the setup needed, where we have Zookeeper running on port 2181, Kafka running on port 9092 with all the needed topics, Postgres running on port 5432 having a ‘StudentDB’ pre-created and finally the Kafka-Connect with Debezium and our custom transformer running on port 8083.
Running the Solution
Once the Student Microservice isstarted we can see the pattern in action. To simulate a Student enrollment, we can execute the below curl.
curl -X POST \
'http://localhost:8080/students/~/enroll' \
-H 'content-type: application/json' \
-d '{
"name": "Megan Clark",
"email": "mclark@gmail.com",
"address": "Toronto, ON"
}'
We see that a new student record is inserted into the database for ‘Megan Clark’.
And we see an event published into the topic student_enrolled
, notifying the downstream systems that ‘Megan Clark’ has enrolled.
To simulate a student updating the email address, we can execute the below curl operation.
$ curl -X PUT \ http://localhost:8080/students/1/update-email \
-H 'content-type: application/json' \ -d '{ "email": "jsmith@gmail.com"}'
We can notice that email has been changed into ‘jsmith@gmail.com’
And we see an event published into the topic student_email_changed
, notifying the downstream systems that Student with Student-ID ‘1’ has changed his email id.
If we comment on the line of code that deletes outbox events after writing them in the EventService (outBoxRepository.delete(entity)
), we can view the events inserted in the outbox table.
Summary
In a microservice architecture, system failure is inevitable. Adapting this architecture style forces us to design for failures. The Outbox Pattern gives us a robust method of reliable messaging in the face of failure.
The above solution makes the implementation of the pattern simple. But to make the system highly available, we must run multiple instances (clusters) of Zookeeper, Apache Kafka, and Kafka Connect.
Finally, I would like to point out this isn’t the only way to tackle the problem of reliable messaging. But it is an invaluable pattern to have at your disposal.
Further Reading
Opinions expressed by DZone contributors are their own.
Comments