A Robust Distributed Payment Network With Enchanted Audit Functionality - Part 2: Spring Boot, Axon, and Implementation
In this post, I demonstrate how to build a distributed payment network system with enchanted audit functionality by means of Spring Boot, Spring Cloud, Axon, Neo4j.
Join the DZone community and get the full member experience.
Join For FreeThis post continues the Part 1. We build a simplified but robust and scalable Qiwi-like payment network. The network allows its users to transfer money between the network accounts. To satisfy the functional requirements from Part 1, we use microservice architecture with 1 microservice for every domain object. Specifically, we use Spring Boot and Spring Cloud for the system to be scalable and robust. Also, we use the Axon framework to conveniently utilize the CQRS and Event Sourcing patterns. Finally, we use a Neo4j graph database, alongside a Postgres database, to efficiently audit suspicious money transfers between "related" accounts.
This post is organized as follows. First, I describe the system and its parts. Second, I describe how to implement the account and transfer microservices. Finally, I describe e2e test scenarios to verify if the system works as intended.
System
The component diagram of our system is depicted in Fig 1. The fully workable code can be found here; the code is based on the Udemy course of Sergey Kargopolov.
In this post, we don't concern with Spring Cloud security. A reader, who is interested in security, may consult, for example, this post, where an in-depth exposition of OAuth2 and two-factor authorization is given. Here we assume the users are already authorized. Let's see what is what in Fig 1.
According to Part 1, there are 2 domain microservices: Account and Transfer ones. The Transfer service transfers money between accounts; the Account service maintains the accounts. The Eureka discovery service (DS) assigns different ports to the instances of the Account and Transfer services. The Spring Cloud gateway (G) receives the deployment addresses of the services. The user interacts with the system via G.
The domain services store their data in relational databases RDBa and RDBt (PostgreSQL in our case). Also, the domain services have a single common graph database GraphDB (Neo4j in our case). Finally, for the domain services to exchange commands, events, and queries, as it is described in Part 1, the commands, events, and queries should be a shared library.
This system is robust and scalable. Indeed, DS just as well assigns new ports (one per service) even to multiple instances of the same domain service. Then, DS reports the addresses to G; G routes and load balances the incoming requests. If one of the domain services is down, DS recognizes this fact and notifies G so that no requests are sent to the failed service instance.
As it is pointed out in Part 1, we don't explicitly create a User microservice, since we assumed a user may have no more than 1 account, and the user is assumed to be authorized. The only non-credential field of the User domain is his/her "relatives". This information is included in the Account Read Model (see Part 1 to refresh on the definitions). Before we see how our services are implemented, let's review the Axon framework annotations.
Brief Summary of Axon Annotations
Axon framework provides tools to construct CQRS and ES systems. Here is a brief summary of these tools. For more details, see the documentation.
Orchestrator
In Axon framework, orchestrators are called "sagas". A saga is a public class annotated with @Saga
. A saga usually has a CommandGateway and a QueryGateway @Autowired
. A saga sends commands to a command bus by means of the CommandGateway. On every scenario step, a saga receives an event and issues a command. For every saga, there is a public void event handler that starts the saga; the handler is annotated with @StartSaga
and @SagaEventHandler
. Also, for every saga, there is a public void event handler that ends the saga; the handler is annotated @EndSaga
and @SagaEventHandler
. Intermediate event handlers are annotated with just @SagaEventHandler
.
Aggregate
Every Axon aggregate is a public class annotated with @Aggregate
. An aggregate has its own state fields; there must be a single @AggregateIdentifier
- annotated field among these fields. An aggregate receives commands and emits events to change its own state and notify the system that the command was processed. The aggregate constructor is annotated with @CommandHandler
. Other command handlers (they are public void) are also annotated with @CommandHandler
. An aggregate changes its own state in public void methods annotated with @EventSourcingHandler
.
Notice how an aggregate responds to a command. If the aggregate constructor is called, the aggregate identifier (a String in our case) is returned. For a successfully executed regular command, the aggregate responds with null
. If something goes wrong, an exception is thrown.
Command
Every command in the Axon framework is a POJO; among other fields in the POJO, there must be one annotated with @TargetAggregateIdentifier
. As the name suggests, the field must match @AggregateIdentifier
of the aggregate the command is intended for. To send a command to a command bus, one must feed the command to a commandGateway
.
Event Handler
Finally, here is a tool to update the Read model on an event emitted by an aggregate. In the Axon framework, such handlers are located in public class
es annotated with @Component
; these components usually access database repositories. Every public void
event handler is annotated with @EventHandler
; inside these handlers, Read models get updated.
Auxiliary Tables
In the Axon framework, every aggregate and saga have their information stored in some tables. For example, the Account microservice stores Read model and auxiliary tables in its database:
Here the accounts
and accountlookup
are Read model tables. The former stores the current accounts, the latter stores only the accounts' Ids to notify the user if there exists such an account. saga_entry
, token_entry
, association_value_entry
are auxiliary Axon tables to maintain the saga state for every saga step. The token_entry
table is needed to know on what saga step to continue to execute the saga if the Account microservice first went down and then started up. To develop the application faster, we need to automatically clean up these tables before every e2e test scenario. Let's see how the Account service is implemented.
Account Service Implementation
According to the CQRS and ES patterns from Part 1, we split the Account service into 2 parts: the Read and Write models (Fig 2).
accounts
table in RDBa; the table columns match the Account aggregate fields, plus (auto) created
and updated
LocalDateTime fields, annotated with @CreationTimestamp
and @UpdateTimestamp
accordingly. Also, there is an accountlookup
table in RDBa that stores only accountId
s; this is needed for us to check if the account with a specified accountId
exists. Finally, there is a GraphDB, where the Account service persists only the account nodes and only with accountId
s. The GraphDB is shared between the Account and the Transaction services.
The Write model is the Account aggregate. The aggregate's fields are specified by the system's domain model. In this case, the fields are self-explanatory: accountId
(a string), balance
(a BigDecimal), isBlocked
(a boolean). Notice that Axon stores all the events with their timestamps, so there is no need for us to have a dedicated timestamp field in the aggregate.
The Account aggregate receives commands and emits events; also the aggregate handles its own events and updates its own state. The commands can be deduced from the functional requirements from Part 1. The CreateAccount
command is needed to create a new account; the command's arguments specify the initial account balance. If the account is created successfully, the system returns a random UUID accountId.
ReserveMoney
and Deposite money
commands are needed to transfer money from one account to another. The former command has the amount
to transfer and the accountFromId
as the command's arguments. The latter command has the amount
to transfer and the accountToId
as the command's arguments. These 2 commands also have a transactionId
field. The field is used when money is transferred by a transaction from an account to an account. How these commands work in successful and failed transfer scenarios is described later in this post.
A RollbackMoney
command is called if the receiving account goes down or is blocked. The command's arguments are the accountId
and the amount
of money. The command does the same as the DepositeMoney
command. How this command works in scenarios is described later in the post.
Also, the Block
account command blocks an account. The command has an accountId
as its argument. Money can be transferred neither from no to a blocked account.
Finally, the Account microservice contains endpoints to clean the auxiliary tables described above (see the code). These endpoints are easy to use in e2e tests. Let's see how the transaction service is implemented.
Transaction Service Implementation
The main CQRS and Event Sourcing elements of the Transaction microservice are shown in Fig 3.
The persistent part of the Read model in a Transaction service is as follows. All transactions are stored in the
transactions
table of RDBt; the table columns match the Transaction aggregate fields, plus (auto) created
and updated
LocalDateTime fields, annotated with @CreationTimestamp
and @UpdateTimestamp
accordingly. Partially completed and rejected transactions are stored in this table as well, but with CREATED or REJECTED transaction status.
Also, the persistent part of the Read model includes the GraphDB, where each node matches an account but includes only the account's Id. Each edge matches a transaction or a relation. The former includes only the transaction's Id, the latter includes only the relation type, like "uncle". The purpose of this GraphDB is to detect cycles. An investigator can then dig deeper into RDBt and RDBa to get the details.
The Write model is a Transaction aggregate. The aggregate's fields are specified by the system's domain model. In this case, the fields are self-explanatory:
transactionId
(a string), accoutToId
(a string), accountFromId
(a string), amount
(a BigDecimal), TransactionStatus
(an enumeration: CREATED, APPROVED, REJECTED). Just as for an Account aggregate, there is no need to explicitly create a timestamp field in a Transaction aggregate; Axon does all these for us.
The Transaction aggregate accepts 3 commands:
CreateTransaction
, ApproveTransaction
, RejectTransaction
. A CreateTransaction
command has the following fields: transactionId
(a string, aggregate identifier), accountToId
(a string), accountFromId
(a string), amount
(a BigDecimal), transactionStatus
(an enumeration).
Approve transaction command has 3 fields:
transactionId
(aggregate identifier), accountToId
, accountFromId
. This command "wraps up" a successful transaction. Reject transaction command has 2 fields: transactionId
(an aggregate identifier), reason
(a string). This command "wraps up" an unsuccessful transaction and stores a reason for failure. Let's investigate how to use GraphDB in this system.
Note on How To Use a Graph Database in Microservice Systems
Let's try to persist an accountFrom-transaction->accountTo
relation in Neo4j. Neo4j provides an Object Graph Mapping (OGM) functionality to Spring Data. To do this by means of the OGM, we need to present the related GDB nodes, for example, the following way (similarly to how we do this in an RDB and ORM):
@RelationshipEntity(type = "TRANSACTION")
class Transaction {
@StartNode
Account accountFrom;
@EndNode
Account accountTo;
}
However, in a microservice architecture, it is problematic to use this approach since the nodes should know too much about each other: a Transaction should know not just the accountTo
and accountFrom
Ids, but also about the whole accountTo
and accountFrom
classes. To make the domain elements as little coupled as possible, we should make sure that a Transaction knows only the account Ids.
To implement this accountFrom-transaction->accountTo
relation in Spring Data, let's do the following:
driver = GraphDatabase.driver(uri,AuthTokens.basic( user,password));
public String persistTransactionRelation(String transactionId,
String accountFromId,
String accountToId){
Map<String, Object> params = new HashMap<>();
params.put("transactionId",transactionId);
params.put("accountFromId",accountFromId);
params.put("accountToId",accountToId);
try ( Session session = driver.session() )
{String greeting = session.writeTransaction( new TransactionWork<String>()
{ @Override
public String execute( Transaction tx )
{ Result result = tx.run(
"MERGE (aFrom: Account {accountId: $accountFromId}) "+
" MERGE (aTo: Account {accountId: $accountToId}) "+
" MERGE (aFrom) -[l: Transaction] ->(aTo) " +
" ON CREATE SET l.transactionId = $transactionId "+
" ON MATCH SET l.transactionId = $transactionId "+
"RETURN l.transactionId",
params );
return result.single().get( 0 ).asString();}} );
return greeting;
}}
First, we get a Neo4j driver. Then we create a map of the method input parameters to the query input parameters. Finally, we execute the Cypher query. The query first creates an accountFrom
(if it doesn't exist). Then the query creates an accountTo
(if it doesn't exist). Finally, the query creates an accountFrom-transaction->accountTo
relation (if it doesn't exist) or updates the transactionId
in relation, if the relation already exists. The query returns the transactionId
if successful.
Also notice that as of now (version 4.4), Neo4j GDB does not allow bidirectional relations between its nodes. So, for every 2 related accounts, we have to create 2 relations, like an "uncle" and a "nephew".
Finally, let's see how to detect these kinds of "transaction-relation" cycles in Neo4j.
MATCH (aFrom:Account)-[:Transaction*..100]->(aTo:Account), (aFrom)-[:Relation]->(aTo)
RETURN aFrom, aTo
Here, the [: Transaction*..100]
says there should be up to 100 consecutive Transaction relationships AND a single Relation
relationship between aFrom
and aTo
accounts. For the network in Fig 4 (A), this Cypher query gives the result in Fig 4 (B):
The Cypher query correctly detects the cycle. Let's see how all these work dynamically.
Transaction Dynamics
We investigate sequence diagrams of 3 transactions: a successful transaction, a failed transaction due to insufficient funds, a failed transaction due to the recipient account being down.
Successful Transaction
Assumptions: 1) there are two accounts, accountFrom
and accountTo
, 2) the former account has enough money deposited, 3) both accounts are not blocked.
This scenario starts when a user calls TransactionCommandController.createTransaction(...)
POST method and provides the accountFromId
, accountToId
, and amount
to transfer. The controller creates and sends a CreateTransactionCommand
to the constructor of a Transaction aggregate
. As a new transaction aggregate instance is created, the transaction saga gets notified by a TransactionCreatedEvent
. At the same time, the Transaction Read model gets updated with a newly created transaction.
Next, the saga issues a ReserveMoneyCommand
to the accountFrom
aggregate. The aggregate checks if the account is not blocked and has enough money. Then, if positive, the aggregate reduces its balance appropriately and issues a MoneyReservedEvent
. The AccountEventHandler
handles the event and updates the accountFrom
Read model.
As the saga gets the MoneyReservedEvent
, the saga issues a DepositeMoneyCommand
to the accountTo
aggregate. The aggregate increases its balance appropriately and issues a MoneyDepositedEvent. The AccountEventHandler
handles the event and updates the aggregateTo
read model.
Then, the saga receives the MoneyReservedEvent
and issues a TransactionApprovedCommand
to the Transaction aggregate. The aggregate fires a TransactionApprovedEvent
. The event gets received by the TransactionEventHandler to update the transaction Read model, including the GraphDB.
Finally, the saga receives the TransactionApprovedEvent, creates a TransactionQuery, feeds the query to the queryUpdateEmitter
. The TransactionCommandController
receives the query and returns an HTTP response to the user. Let's see what happens if something goes wrong.
Failed Transaction: Insufficient Funds
The scenario goes as the first one, except the AccountFrom Aggregate checks its balance to be insufficient and throws an exception:
@CommandHandler
public void handle(ReserveMoneyCommand reserveMoneyCommand) {
if(quantity.compareTo( reserveMoneyCommand.getAmount())==-1) {
LOGGER.info("Insufficient funds on the account "+this.accountId);
throw new IllegalArgumentException("Insufficient funds on the account "+this.productId);
}
...
}
Once the saga catches the exception, the saga issues a RejectTransactionCommand
:
@StartSaga
@SagaEventHandler(associationProperty="transactionId")
public void handle(TransactionCreatedEvent transactionCreatedEvent) {
ReserveMoneyCommand reserveMoneyCommand = ReserveMoneyCommand.builder()
...
commandGateway.send(reserveMoneyCommand, new CommandCallback<ReserveMoneyCommand, Object>() {
@Override
public void onResult(CommandMessage<? extends ReserveMoneyCommand> commandMessage,
CommandResultMessage<? extends Object> commandResultMessage) {
if(commandResultMessage.isExceptional()) {
RejectTransactionCommand rejectTransactionCommand =
new RejectTransactionCommand(transactionCreatedEvent.getTransactionId(),
commandResultMessage.exceptionResult().getMessage());
commandGateway.send(rejectTransactionCommand);
}
}
});}
The Transaction aggregate handles the command and fires a TransactionRejectedEvent. The saga handles the event and issues a TransactionSummary to the queryEventEmitter
. The TransactionCommandController
uses this summary to create an HTTP response.
Failed Transaction: Recipient Account Is Down or Blocked
In this case, after a DepositeMoneyCommand was sent to the accountTo, the system throws either a ServiceNotReachableException
or an IllegalArgumentException("account is blocked")
. The saga catches the exception and initiates a compensation transaction:
@SagaEventHandler(associationProperty="transactionId")
public void handle(MoneyReservedEvent moneyReservedEvent) {
...
try {
result = commandGateway.sendAndWait(proccessPaymentCommand, 10000, TimeUnit.MILLISECONDS );
LOGGER.info("commandGateway.sendAndWait(DepositMoneyCommand )"+result);
} catch(Exception ex) {
LOGGER.error("Saga MoneyReservedEvent after DepositMoneyCommand is sent " + ex.getMessage());
// Start compensating transaction
cancelMoneyReservation(moneyReservedEvent,ex.getMessage());
return;
}
...
}
private void cancelMoneyReservation(MoneyReservedEvent moneyReservedEvent, String reason) {
...
MoneyRollbackCommand moneyRollbackCommand =
MoneyRollbackCommand.builder()
...
.build();
LOGGER.info("MoneyRollbackCommand is about to be sent, account from is "+ publishProductReservationCommand.getProductId());
commandGateway.send(moneyRollbackCommand);
}
The accountFrom
aggregate receives the command, emits a MoneyRolledBack
event, restores the aggregate's balance to the pre-transaction value. The saga handles the event and issues a RejectTransactionCommand
. After that, it goes as in the previous scenario. Let's see how to automatically test all these.
Results: e2e Test Scenarios
Our test scenarios consist of the following elementary e2e tests (see the HTTP folder in the code for details):
- Create an account with a specified balance. The input:
balance
(a BigDecimal); the output: a randomaccountId
(a string) if successful, error if not. - Transfer a specified amount of money from account A (Id A) to account B (Id B). The input: an
accountFromId
(a string), anaccountToId
(a string), anamount
(a BigDecimal); the output: aTransactionStatus
(an enum). - Block/unblock an account. The input: an
accountId
(a string); the output: amessage
(a String) if the account was blocked. Recall, that - Relate two accounts (aux). The input:
accountFromId
(a string),accountToId
(a string),relation
(a string); the output: amessage
(a string) if the account was blocked. - Clean up Axon, Transfer, Account CQRS tables (aux). The input: none; the output: a
message
(a string) if the tables were successfully cleaned. - Retrieve an Account balance (aux). The input: an
accountId
(a string); the output: thebalance
(a BigDecimal) or an error message if the account doesn't exist.
The transfer money scenario works as follows:
1. Clean up the Axon, Transfer, Account CQRS tables.
2. Create an account with a specified balance.
3. Create another account with a specified balance.
4. Transfer an amount of money from the first to the second account. If there is no overdraft, the accounts are not blocked, the account and transfer services work as expected, and we get a TransactionStatus.ACCEPTED
message. Please check other scenarios in the postman folder.
5. Retrieve the accounts balances and verify that the amount is transferred.
These steps can be easily automated with Postman (or similar) software. Please check the instruction on how to install and run the app.
Conclusion
In this post, I demonstrated how to implement a simple distributed payment network system with enhanced audit functionality by means of Axon, Spring Boot, Spring Cloud, Neo4j, Postgres. Also, I pointed out how to use a Graph DB for a microservice system and how flexible it becomes to use a GraphDB in a CQRS system. Finally, I described how successful and unsuccessful transaction scenarios work.
Acknowledgment
I would like to thank my mentor Sergey Suchok for his help on this post.
Opinions expressed by DZone contributors are their own.
Comments