Ports and Adapters Architecture with Kafka, Avro, and Spring-Boot
In this post, we will be implementing a Kafka Producer and Consumer using the Ports and Adapters (a.k.a. Hexagonal) architecture in a multi-module Maven project.
Join the DZone community and get the full member experience.
Join For FreeIn this post, we will be implementing a Kafka Producer and Consumer using the Ports and Adapters (a.k.a. Hexagonal) architecture in a multi-module Maven project. We will also be using the KafkaAvroSerializer to send specific Avro types using Kafka and the Kafka Schema Registry.
Overall Flow
The overall workflow is something like this: a domain object (Person.java) is sent from BusinessDomainService to Kafka where it gets serialized to Avro object (PersonDto.java); which is then consumed from the Kafka topic via a Kafka Consumer and translated from Avro object (PersonDto.java) back to domain object (Person.java) before being sent back to BusinessDomainService for some arbitrary post-processing.
The whole flow is a little contrived in that we are sending a Kafka message half-way around the world only to arrive for post-processing back in BusinessDomainService, but, in my opinion, it nicely closes the loop so I am keeping it like that.
Implementation
In terms of implementation, we will be going with a multi-module maven project, and here is the list of modules that we will need.
application module
This being a Spring Boot application, this module houses the main @SpringBootApplication class to jump-start the application.
domain module
This is the core business/functional module and it is here that all our business service classes (BusinessDomainService.java) and business domain model classes (Person.java) live.
In terms of dependencies, true to its Hexagonal Architecture, the domain module has no inward dependency; this is the core business domain module and should, therefore, not know anything about the outside world. Here sits the business logic of your application. The goal is to have it written in plain language so that an analyst or even non-technical person could understand. Inside of it, we use a domain-specific language, which can be easily understood by business people. And it should be agnostic of any Java framework because these are only scaffolding of an application. And the core is the heart of an application, something that encapsulates the logic.
This module houses my core domain/business model i.e. Person.java.
xxxxxxxxxx
package com.purnima.jain.domain.model;
public class Person {
private String firstName;
private String lastName;
..........
}
It also includes the core business service class i.e. BusinessDomainService.java. This can be made as an interface but for simplicity, I just defined it as an implementation here itself.
x
public class BusinessDomainService {
private static final Logger logger = LoggerFactory.getLogger(BusinessDomainService.class);
ProducerService producerService;
public void generateAndSendMessage() {
for (int i = 0; i < 5; i++) {
............
logger.info("Generating Person data to sent to ProducerService :: person :: {}", person);
producerService.sendMessage(person);
}
}
public void postProcessReceivedMessage(Person person) {
logger.info("Post Processing the received message from Kafka-Consumer :: person :: {}", person);
}
}
If you dig a little deeper, you will notice that I defined ProducerService as an interface, and that has far-fetching implications on our design. As per Hexagonal Architecture, my domain should be completely technology agnostic, so in this case, there is no reason for it to know what delivery-mechanism I will be using to send a produced message, hence ProducerService has been defined as a Port. And it can have any number of implementations or Adapters but my domain has no business knowing about any of that.
We will talk about this adapter implementation a little later when we come to it.
avro module
This module hosts the person.avsc file which is our Avro Schema for PersonDto. The pom.xml of this module will have the avro-maven-plugin required to generate the Java classes from the given Avro Schema. These DTOs are what will be used to transport and store data in Kafka.
Similar to domain module is the case with avro module, other than the avro dependency and the avro-maven-plugin to auto-generate the Java classes corresponding to the provided Avro schema files, this module should not be aware of anything.
At the same time, there is no reason for my domain module to be aware of this module; the domain module again does not care about what kind of format the delivery mechanisms used to transfer the data over the wire. All that it cares about is that it sends a Person object (Person.java in domain module) and gets back an instance of Person object in the received message.
xxxxxxxxxx
{
"namespace": "com.purnima.jain.avro.dto",
"type": "record",
"name": "PersonDto",
"fields": [
{
"name": "firstName",
"type": "string",
"doc": "the first name of a person"
},
{
"name": "lastName",
"type": "string",
"doc": "the last name of a person"
}
]
}
kafka-producer module
This module basically forms a part of our infrastructure layer and is responsible for converting the domain model classes (Person.java) to its Avro-specific version (PersonDto.java) and then publishing it to Kafka. This module houses the Kafka dependencies and is the only module (other than Kafka-consumer) that is aware that Kafka is being used as a messaging infrastructure.
For a change, Kafka-producer has quite a few inward dependencies like domain and Avro module. It needs to depend on the domain module because the references of the business model classes that need to be sent over to Kafka reside in the domain, and at the same time, it needs to depend on the Avro module because it is the Kafka-producer that is responsible for converting business model classes to their corresponding Avro DTOs for transmitting them over Kafka.
It's the KafkaProducerService that acts as an Adapter for the ProducerService port defined in the domain module that we talked about earlier. And is the class that actually interacts with Kafka to send a message.
xxxxxxxxxx
public class KafkaProducerService implements ProducerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);
("${spring.kafka.topic-name}")
private String topicName;
private KafkaTemplate<String, PersonDto> kafkaTemplate;
KafkaProducerService(KafkaTemplate<String, PersonDto> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(Person person) {
PersonDto personDto = PersonDto.newBuilder()
.setFirstName(person.getFirstName())
.setLastName(person.getLastName())
.build();
logger.info("Sending message to Kafka :: personDto :: {}", personDto);
ListenableFuture<SendResult<String, PersonDto>> future = kafkaTemplate.send(topicName, personDto);
..................
}
}
kafka-consumer module
This module also forms a part of our infrastructure layer and is responsible for doing the reverse-conversion of transforming the Avro-specific version (PersonDto.java) which has been read from Kafka to its corresponding domain model classes (Person.java). Like Kafka-producer, this module houses the Kafka dependencies and is the only module that is aware that Kafka is being used as a messaging infrastructure.
Similar to Kafka-producer, it too needs to have a dependency on the domain and Avro module for the same reason as Kafka-producer above.
The class KafkaConsumerService.java consumes the message from the Kafka topic and delegates it to BusinessDomainService in the domain module to perform any post-processing and to close the loop.
xxxxxxxxxx
public class KafkaConsumerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
BusinessDomainService businessDomainService;
(topics = "${spring.kafka.topic-name}")
public void listen( PersonDto personDto, (KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
logger.info("Receiving message from Kafka :: personDto :: {}", personDto + " from partition: " + partition);
Person person = Person.builder()
.firstName(personDto.getFirstName().toString())
.lastName(personDto.getLastName().toString())
.build();
businessDomainService.postProcessReceivedMessage(person);
}
}
There are, of course, many patterns to accomplish the same thing; this is just one of the ways to have a strong separation-of-concerns between various parts of your application but by no means the only one.
Hexagonal Architecture combined with a multi-module-maven project can give you tremendously strong isolation and boundaries and you can spend hours having interesting conversations at dinner-parties about where to put which Java class.
Source Code
You can download the source-code here.
Opinions expressed by DZone contributors are their own.
Comments