How To Get Closer to Consistency in Microservice Architecture
Some scenarios in microservice-based architecture involve multiple microservices. Read how completion of the scenario depends on the task completion of each microservice.
Join the DZone community and get the full member experience.
Join For FreeWhen it comes to transactions, the first thing that comes to mind is ACID. Actually ACID is an important 4-point concept:
- Atomicity: Each statement in a transaction (to read, write, update or delete data) is treated as a single unit. Either the entire statement is executed, or none of it is executed. This property prevents data loss and corruption from occurring if, for example, if your streaming data source fails mid-stream.
- Consistency: Ensures that transactions only make changes to tables in predefined, predictable ways. Transactional consistency ensures that corruption or errors in your data do not create unintended consequences for the integrity of your table.
- Isolation: When multiple users are reading and writing from the same table all at once, isolation of their transactions ensures that the concurrent transactions don’t interfere with or affect one another. Each request can occur as though they were occurring one by one, even though they're actually occurring simultaneously.
- Durability: Ensures that changes to your data made by successfully executed transactions will be saved, even in the event of system failure.
ACID makes sense in that every system has its own database and everything is in one module of codes. However, in the microservice-based architectures, we have multiple loosely coupled modules where each one has its own database, transaction, and of course, ACID, but they are connected. There are still scenarios that involve multiple microservices, and the completion of the scenario depends on the completion of the tasks of each of these microservices. Ideally, all microservices perform their tasks correctly; but if one microservice fails, the whole scenario must be failed. Conceptually we need to have a general transaction between multiple independent microservices.
For better understanding, let's look at a real-world scenario. This scenario is a regular microservice-based ordering system that contains the following microservices:
1- OrderService, which is responsible for registering a customer's order.
2- PaymentService, which is responsible for giving order money from a customer.
3- RestaurantService, which is responsible for customer order preparation.
4- DeliveryService, which puts orders in the delivery queue.
Suppose in this simple scenario, the customer registers a food order through the OrderService so that this microservice receives order information from the customer and stores it in its own database. The next step is the payment process, which is performed by PaymentSerivce. This microservice receives customers' card information and pays. If everything was correct and payment was made, the customer's order can be complete; but what happens if there is not enough money on the customer's card, or the payment gateway is cut off? We have an unsuccessful payment, but still a pending order in OrderService. We would need to find a way to cancel that pending payment in OrderService database. To put it simply, we need a transaction between microservices.
Distributed Transactions
There are several ways to manage transactions in microservice-based architecture, the most prominent of which is the distributed transactions. A distributed transaction is based on operations on data that are performed across multiple databases. Distributed transactions can be handled in two ways: first, coordinated across multiple separate nodes connected through a general network; and second, with a single server that contains multiple databases.
Two-Phase Commit
A two-phase commit (2PC) is the most famous protocol of distributed transactions. In implementing this protocol, we assume that one process will function as the coordinator and the rest as cohorts (the coordinator may be the one that initiated the transaction, but that's not necessary). We further assume that there is stable storage and a write-ahead log at each site. Furthermore, we assume that no machine involved crashes forever. The protocol works as follows (the coordinator is ready to commit and needs to ensure that everyone else will do so as well):
Phase |
two phase commits steps |
|
coordinator |
cohort |
|
Phase 1 |
write prepare commit message to the log |
Work on transaction; when done, wait for prepare message |
Send prepare to commit message |
|
|
Wait for reply |
Receive message, when transaction is ready to commit, write agree to commit or abort to log |
|
Send agree or abort reply |
||
Phase 2 |
Write commit message to the log |
Wait for commit message |
Send commit or abort message |
Receive commit or abort message |
|
Wait for all cohorts to respond |
if commit was received, write “commit” to the log, release all locks, and update database else if abort received undo all changes |
|
Send done message |
||
Cleanup all state, Done. |
|
Two-phase commit and similar protocols are not good options for transaction in microservice-based architecture for several reasons. The most important reason is that one node is responsible for the coordination of transactions between other nodes. Obviously, there is a single point of failure, and if any problem occurred for the coordinator node, the whole transaction management system will be in trouble. The second important problem is that the transaction system response time depends on the slowest node's response time. In addition, the following can be mentioned as other problems of 2PC:
- There is at least O(4n) in addition of retries is O(n^2)
- Reduces throughput due to locks
- Not supported by many NoSQL or even message brokers (till now)
- Impacts 'Availability ' in CAP theorem
SAGA
SAGA pattern was first introduced to the world in 1987 by Hector Garcia-Molina and Kenneth Salem in an article named SAGAS. The idea behind it is quite simple and based on the sequence of transactions that can be interleaved with other transactions. Generally, microservice-based architectures consist of smaller services and each service has its own transaction. SAGA provides the mechanisms for interleaving these small transactions with each other. SAGA's architecture pattern manages general transactions using a sequence of local transactions. Each local transaction participates in SAGA as a single independent unit of work. The key point is that SAGA guarantees that either all operations are completed successfully or all participants should be rolled back to their local transaction. In regular and RDBMS-based systems, commit and rollback of transactions are done automatically. However, in SAGA, the situation is different and every unit of the SAGA ecosystem should manage, commit, and rollback by itself. Actually, every microservice that has been involved in the SAGA ecosystem should expose two major services: first, a service that represents the main functionality that microservice wants to participate with; and second, the rollback service that reverses that first service change in the local database, which is called compensation service. For example, in the OrderService, the main service is CREATE ORDER, and the compensation service is CANCEL ORDER. Each SAGA-based transaction consists of n transaction and n compensation. This order system has been considered as a case study in this article. As you can see in the below picture, we have four microservices, so we have a four-compensation service in addition to four main transactions.
assumption 1 : for n>0 if T(n+1) fails then all T(1)..T(n) should be failed =>All C(1)..C(n) must be called.
assumption 2: compensation service must be idempotent and can not abort, they must be retried until succussed.
SAGA Implementations Strategy
SAGA regularly is implemented in two ways:
Orchestration-Based SAGA
In this approach, we have one coordinator component that can even be a participant microservice or an independent component. Indeed, an orchestrator (object) tells the participants what local transactions to execute. Suppose this scenario:
- Order Service receives the post/order request and creates an order through SAGA orchestrator.
- The SAGA orchestrator creates an Order in the PENDING state.
- It then sends a command to the PAYMENT SERVICE.
- The PAYMENT SERVICE attempts to pay the order amount through the payments gateway.
- If payment is either successful or fails, a reply message sends back indicating the outcome.
- The SAGA orchestrator either approves or rejects the Order with the response of ORDER SERVICE.
- If the result of step 6 is a success, the coordinator tells RESTAURANT SERVICE to start his transaction and tells ORDER SERVICE to change the Order status to PAID.
- If the result of step 6 is failed, the coordinator tells ORDER SERVICE to change the Order status to PAID.
This approach, as you can guess, has a famous problem that we are all afraid of: Single Point of Failure. If the coordinator fails, all ecosystems will fail.
Choreography-Based SAGA
Unlike the previous approach, in this protocol, there is not a single point of failure. In choreography-based SAGA, each local transaction publishes domain events that trigger local transactions in other services.
Practical Choreography-Based Saga With Simplifier Assumption
The ideal implementation of SAGA is choreography-based protocol. This approach relies on message brokers and event handlers in order to achieve the desired result. The event handler that we have used is Apache Kafka. Apache Kafka is a distributed open-source streaming and event handling platform that is used by many companies as well as software architectures. Kafka was initially conceived as a messaging queue and is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform. The most important data structure in Kafka is topic. Simply put, topic can be considered as a queue of events which everybody in the role of consumer can listen to in order to notify of new events, or can put a new event in the topic as a producer.
Let's simulate or case study with choreography and aim of Kafka.
Simplifier assumption: In this part, we assume that we have only two microservices: OrderService and PaymentService.
Assumption: There is a customer Order object with three possible statuses: PENDING, PAID, or FAILED.
- The customer sends his order to OrderService.
- OrderService puts the order in the topic (PendingOrderTopic) after processing the order and setting the status of it to PENDING. Immediately after that, the Payment Service which listens to this topic received this order and tries to pay through the payment gateways like PayPal.
- If the payment is successful, the state of the order is set to PAID and sent to the restaurant to prepare the order.
- However, if the payment is failed for any reason, the order state changes to FAILED and is placed in the topic with the name (FailedPaymentTopic). Finally, the OrderService which listens to this topic receives this event and notifies the customer.
Invariant
The main invariant of consistency in our case study is every time we have successful order, if and only if =>sum(order.amount)<= customer.credit, and you can not find any path in the system to violate it.
Saga Tends To Be ACD in Some Cases
SAGA-based architectures actually explicitly guarantee Atomicity, Consistency, and Durability, but do not necessarily guarantee Isolation.
- Atomicity: All transactions should be executed or all compensated.
- Consistency: SAGA has two referential integrities: first, integrity within all microservices handled by local databases; second, integrity between microservices that handle the application.
- Durability: Handled by local databases
For reasons as illustrated in the table below, we have a lack of Isolation in SAGA:
Lost update | one microservice reads the data that can be changed with other microservice: 1- T(i) reads data 2- T(j) changes that data 3-T(i) changes the data = >T(j) will lose the data |
Dirty Reads | microservice one writes the data; another one reads the data; then microservice one compensate the transaction : 1- T(i) writes the data 2-T(j) reads the data 3-T(i) compensates the transaction =>T(j) does not have a valid data |
Non - repeatable/fuzzy reads | 1- T(i) reads the data 2-T(k) writes the data 3-T(j) reads =>T(i) and T(j) have a different value |
However, this is not the end of the game. There are a variety of countermeasures for reducing the impact of isolation anomalies, such as:
- Versioning data: Each data should have a version; every update should check the data to have the same version as existing data; then increases the version of data
- Re-read the value: Before modifying value, T(i) re-reads the value that was read by a previous T(I).
- Semantic locking: Setting a lock flag that prevents other transactions from accessing it.
Case Study: Order Management System Implementation with SAGA
In this part, we tend to represent a complete overview and implementation of OrderService. As you can see, we have four microservices: OrderService, PaymentService, RestaurantService, and DeliveryService. Each microservice triggers via the events that come from outside of it, so we have the following steps:
- ORDER_CREATE events come from UI; order created by OrderService
- PaymentService starts to do payment after receiving ORDER_CREATED event.
- If payment was successful, ORDER_PAIED event sent to RestaurantService. Otherwise, FAIL event is sent to ORDER_SERVICE to compensate the transaction.
- RestaurantService, which is triggered by the ORDER_PAID event, tells the restaurant to prepare the order. If preparation is successful, the ORDER_PREPARED event is sent to DeliveryService to deliver the order; otherwise, it tells the OrderService and PaymentService to compensate the transaction.
My practical implementation is based on Spring Boot and Spring Cloud frameworks, as well as Apache Kafka event handler. Every microservice exposed two APIs: main transaction and compensation transaction.
There are four Kafka topics: Pending_Orders, Success_Payment_Orders, Failed_Payment_Orders, and Deliverable_Orders. For simplifying implementation, I considered just one compensation scenario, and that scenario is when payment is failed. I suppose that all scenarios after payment will definitely be executed.
- OrderService puts new orders in Pending_Orders and listens to Failed_Payment_Orders to notify the failed payments.
- PaymentService listens to Pending_Orders to notify from new orders. If payment is successful, it puts the order in Success_Payment_Orders, and in the case of failed payment puts in Failed_Payment_Orders.
- RestaurantService listens to Success_Payment_Orders and puts the result in Deliverable_Orders.
- DeliveryService emits the order from Deliverable_Orders and the payment is successfully done.
All microservices involved in the case study has the following features:
- There are four base model entities that all ecosystem based on this model: Customer, Item, Order, and Payment Status (topics that contain the KAFKA topics static name). For simplicity and cleanness of code, there is no setter and getter in code and we have just used lombok.
- Spring Cloud, Spring Boot, and Kafka dependencies that have been added to Pom of all microservices.
- KafkaTemplate<String,Object> has been used in projects to put events in Kafka.
- @KaflkaListener annotation used in the method to listen to the specific topic.
Domain Model
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Customer {
private String name;
private String cardNo;
}
public enum Item {
CHICKEN,MASHED_POTATO,FRIED_CHICKEN,BURGERS,SPAGHETTI;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private List<Item> items;
private PaymentState paymentStat;
private Double amount;
private Customer customer;
}
public enum PaymentState {
SUCCESS,FAIL;
}
public interface Topics {
String FAILED_PAYMENT="failed_payment_orders";
String DELIVARABLE_ORDER="delivarable_orders";
String DELIVARABLE_ORDERS="delivarable_orders";
String PENDING_ORDERS="pending_orders";
String SUCCESS_PAYMENT_ORDERS="success_payment_orders";
}
Order Service
In this microservice, we have as the first class order service microservice, which receives the order from the outside and puts in the appropriate topic:
@Service
public class OrderService {
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
public Order registerOrder(Order order){
ListenableFuture<SendResult<String, Order>> future =
kafkaTemplate.send(Topics.PENDING_ORDERS, order);
return order;
}
}
The second class is the listener class, which listens to Failed_Payment_Orders for failed payment, reverses the order, and notifies the customer.
@Configuration public class PaymentListener { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Autowired private KafkaTemplate<String, Order> kafkaTemplate; @KafkaListener(topics = Topics.FAILED_PAYMENT, groupId = "order" ,containerFactory = "kafkaListenerContainerFactory" ) public void listenToFailedPayments( Order order) { System.out.println("we have failed ordered and going rollback the order : " + order); kafkaTemplate.send(Topics.DELIVARABLE_ORDER, order); } @Bean public Map<String, Object> consumerConfigs() { JsonDeserializer<HeaderEnricher.Container> deserializer = new JsonDeserializer<>(HeaderEnricher.Container.class); deserializer.setRemoveTypeHeaders(false); deserializer.addTrustedPackages("*"); deserializer.setUseTypeMapperForKey(true); Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(JsonDeserializer.TRUSTED_PACKAGES, Order.class.getPackage().getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Order.class); props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false"); return props; } @Bean public ConsumerFactory<String, Order> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Order>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Order> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Finally, we have a rest class, which used to test our code with the following code:
@RestController
@RequestMapping(value = "/order")
public class OrderController {
@Autowired
OrderService orderService;
@PostMapping(value = "/register")
public void registerOrder(){
Customer cr=new Customer("reza","1111-2222-3333-4444");
Customer ct=new Customer("test","5555-6666-7777-8888");
List<Item> items=new ArrayList<Item>();
items= Arrays.asList(Item.BURGERS,Item.CHICKEN,Item.SPAGHETTI);
Order successOrder=new Order(items, PaymentState.SUCCESS,2000.0,cr);
Order failOrder=new Order(items, PaymentState.SUCCESS,100.0,ct);
orderService.registerOrder(successOrder);
orderService.registerOrder(failOrder);
}
}
Payment Service
This microservice just has one listener class that listens to Pending_Order and makes the right decision based on the result of payment.
@Configuration
public class OrderListener {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
@KafkaListener(topics = Topics.PENDING_ORDERS, groupId = "order" ,containerFactory = "kafkaListenerContainerFactory" )
public void listenToFailedPayments(Order order) {
if(order.getAmount()>1000D) {
order.setPaymentStat(PaymentState.SUCCESS);
kafkaTemplate.send(Topics.SUCCESS_PAYMENT_ORDERS, order);
}
else{
order.setPaymentStat(PaymentState.FAIL);
kafkaTemplate.send(Topics.FAILED_PAYMENT, order);
}
}
@Bean
public Map<String, Object> consumerConfigs() {
JsonDeserializer<HeaderEnricher.Container> deserializer = new JsonDeserializer<>(HeaderEnricher.Container.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, Order.class.getPackage().getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Order.class);
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
return props;
}
@Bean
public ConsumerFactory<String, Order> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Order>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Other Services
There are two more microservices that you can find for all ecosystems projects. Refer to my GitHub.
Finally, you can see the sample execution result in the failed payment:
2021-09-12 19:34:02.052 INFO 11956 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-order-1, groupId=order] Discovered group coordinator 192.168.7.61:9092 (id: 2147483646 rack: null)
2021-09-12 19:34:02.052 INFO 11956 --- [-thread | order] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-order-1, groupId=order] Discovered group coordinator 192.168.7.61:9092 (id: 2147483646 rack: null)
we have failed ordered and going rollback the order : Order(items=[BURGERS, CHICKEN, SPAGHETTI], paymentStat=FAIL, amount=100.0, customer=Customer(name=test, cardNo=5555-6666-7777-8888))
2021-09-12 19:34:02.052 INFO 8644 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-order-1, groupId=order] Discovered group coordinator 192.168.7.61:9092 (id: 2147483646 rack: null)
2021-09-12 19:34:02.052 INFO 8644 --- [-thread | order] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-order-1, groupId=order] Discovered group coordinator 192.168.7.61:9092 (id: 2147483646 rack: null)
send notification to kitchen and after food is ready for order : Order(items=[BURGERS, CHICKEN, SPAGHETTI], paymentStat=SUCCESS, amount=2000.0, customer=Customer(name=reza, cardNo=1111-2222-3333-4444)) put it in deliverable orders
Opinions expressed by DZone contributors are their own.
Comments