Data Consistency in Distributed Systems: Transactional Outbox
In this article, we will discuss how to deal with consistency in microservice architecture using the transactional outbox pattern.
Join the DZone community and get the full member experience.
Join For FreeIn today's world of distributed systems and microservices, it is crucial to maintain consistency. Microservice architecture is considered almost a standard for building modern, flexible, and reliable high-loaded systems. But at the same time introduces additional complexities.
Monolith vs Microservices
In monolithic applications, consistency can be achieved using transactions. Within a transaction, we can modify data in multiple tables. If an error occurred during the modification process, the transaction would roll back and the data would remain consistent. Thus consistency was achieved by the database tools. In a microservice architecture, things get much more complicated. At some point, we will have to change data not only in the current microservice but also in other microservices.
Imagine a scenario where a user interacts with a web application and creates an order on the website. When the order is created, it is necessary to reduce the number of items in stock. In a monolithic application, this could look like the following:
In a microservice architecture, such tables can change within different microservices. When creating an order, we need to call another service using, for example, REST or Kafka. But there are many problems here: the request may fail, the network or the microservice may be temporarily unavailable, the microservice may stop immediately after creating a record in the orders table and the message will not be sent, etc.
Transactional Outbox
One solution to this problem is to use the transactional outbox pattern. We can create an order and a record in the outbox table within one transaction, where we will add all the necessary data for a future event. A specific handler will read this record and send the event to another microservice. This way we ensure that the event will be sent if we have successfully created an order. If the network or microservice is unavailable, then the handler will keep trying to send the message until it receives a successful response. This will result in eventual consistency. It is worth noting here that it is necessary to support idempotency because, in such architectures, request processing may be duplicated.
Implementation
Let's consider an example of implementation in a Spring Boot application. We will use a ready solution transaction-outbox.
First, let's start PostgreSQL in Docker:
docker run -d -p 5432:5432 --name db \
-e POSTGRES_USER=admin \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=demo \
postgres:12-alpine
Add a dependency to build.gradle:
implementation 'com.gruelbox:transactionoutbox-spring:5.3.370'
Declare the configuration:
@Configuration
@EnableScheduling
@Import({ SpringTransactionOutboxConfiguration.class })
public class TransactionOutboxConfig {
@Bean
public TransactionOutbox transactionOutbox(SpringTransactionManager springTransactionManager,
SpringInstantiator springInstantiator) {
return TransactionOutbox.builder()
.instantiator(springInstantiator)
.initializeImmediately(true)
.retentionThreshold(Duration.ofMinutes(5))
.attemptFrequency(Duration.ofSeconds(30))
.blockAfterAttempts(5)
.transactionManager(springTransactionManager)
.persistor(Persistor.forDialect(Dialect.POSTGRESQL_9))
.build();
}
}
Here we specify how many attempts should be made in case of unsuccessful request sending, the interval between attempts, etc. For the functioning of a separate thread that will parse records from the outbox table, we need to call outbox.flush()
periodically. For this purpose, let's declare a component:
@Component
@AllArgsConstructor
public class TransactionOutboxWorker {
private final TransactionOutbox transactionOutbox;
@Scheduled(fixedDelay = 5000)
public void flushTransactionOutbox() {
transactionOutbox.flush();
}
}
The execution time of flush should be chosen according to your requirements.
Now we can implement the method with business logic. We need to create an Order in the database and send the event to another microservice. For demonstration purposes, I will not implement the actual call but will simulate the error of sending the event by throwing an exception. The method itself should be marked @Transactional
, and the event sending should be done not directly, but using the TransactionOutbox
object:
@Service
@AllArgsConstructor
@Slf4j
public class OrderService {
private OrderRepository repository;
private TransactionOutbox outbox;
@Transactional
public String createOrderAndSendEvent(Integer productId, Integer quantity) {
String uuid = UUID.randomUUID().toString();
repository.save(new OrderEntity(uuid, productId, quantity));
outbox.schedule(getClass()).sendOrderEvent(uuid, productId, quantity);
return uuid;
}
void sendOrderEvent(String uuid, Integer productId, Integer quantity) {
log.info(String.format("Sending event for %s...", uuid));
if (ThreadLocalRandom.current().nextBoolean())
throw new RuntimeException();
log.info(String.format("Event sent for %s", uuid));
}
}
Here randomly the method may throw an exception. However, the key feature is that this method is not called directly, and the call information is stored in the Outbox table within a single transaction. Let's start the service and execute the query:
curl --header "Content-Type: application/json" \
--request POST \
--data '{"productId":"10","quantity":"2"}' \
http://localhost:8080/order
{"id":"6a8e2960-8e94-463b-90cb-26ce8b46e96c"}
If the method is successful, the record is removed from the table, but if there is a problem, we can see the record in the table:
docker exec -ti <CONTAINER ID> bash
psql -U admin demo
psql (12.16)
Type "help" for help.
demo=# \x
Expanded display is on.
demo=# SELECT * FROM txno_outbox;
-[ RECORD 1 ]---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id | d0b69f7b-943a-44c9-9e71-27f738161c8e
invocation | {"c":"orderService","m":"sendOrderEvent","p":["String","Integer","Integer"],"a":[{"t":"String","v":"6a8e2960-8e94-463b-90cb-26ce8b46e96c"},{"t":"Integer","v":10},{"t":"Integer","v":2}]}
nextattempttime | 2023-11-19 17:59:12.999
attempts | 1
blocked | f
version | 1
uniquerequestid |
processed | f
lastattempttime | 2023-11-19 17:58:42.999515
Here we can see the parameters of the method call, the time of the next attempt, the number of attempts, etc. According to your settings, the handler will try to execute the request until it succeeds or until it reaches the limit of attempts. This way, even if our service restarts (which is considered normal for cloud-native applications), we will not lose important data about the external service call, and eventually the message will be delivered to the recipient.
Conclusion
Transactional outbox is a powerful solution for addressing data consistency issues in distributed systems. It provides a reliable and organized approach to managing transactions between microservices. This greatly reduces the risks associated with data inconsistency. We have examined the fundamental principles of the transactional outbox pattern, its implementation, and its benefits in maintaining a coherent and synchronized data state.
The project code is available on GitHub.
Opinions expressed by DZone contributors are their own.
Comments