Apache Camel Integration with Kafka
This article covers the integration of Apache Camel with Kafka, from setup to testing, with code blocks and plenty of pictures!
Join the DZone community and get the full member experience.
Join For FreeThis article covers Apache Camel Integration with Kafka.
Setup:
Kafka Setup
We will launch Kafka as a docker container.
docker-compose.yml
version'2'
services
# this is our kafka cluster.
kafka-cluster
image landoop/fast-data-dev cp3.3.0
environment
ADV_HOST 127.0.0.1 # Change to 192.168.99.100 if using Docker Toolbox
RUNTESTS 0 # Disable Running tests so the cluster starts faster
FORWARDLOGS 0 # Disable running 5 file source connectors that bring application logs into Kafka topics
SAMPLEDATA 0 # Do not create sea_vessel_position_reports, nyc_yellow_taxi_trip_data, reddit_posts topics with sample Avro records.
ports
# Zookeeper 2181:2181
# Landoop UI 3030:3030
# REST Proxy, Schema Registry, Kafka Connect ports 8081-8083:8081-8083
# JMX Ports 9581-9585:9581-9585
# Kafka Broker 9092:9092
From the path of the docker-compose.yml file run the below command and observe that the Kafka cluster is successfully started.
xxxxxxxxxx
docker-compose up
A screen like the below opens:
Let us create 2 Springboot camel microservices, camel-demo-a
and camel-demo-b
camel-demo-a
will publish the data to Kafka topic which will be consumed by camel-demo-b
In the pom.xml
of both the microservices, add the below dependency.
xxxxxxxxxx
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-kafka-starter</artifactId>
<version>3.8.0</version>
</dependency>
Configure the Kafka broker URL in the application.properties
xxxxxxxxxx
camel.component.kafka.brokers=localhost:9092
Configuring the KafkaSenderRoute in camel-demo-a
The route is configured to read from the file and publish to Kafka topic:
xxxxxxxxxx
package com.vignesh.cameldemoa.routes.a;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
public class KafkaSenderRoute extends RouteBuilder {
public void configure() throws Exception {
from("file:files/input")
.to("kafka:mytopic");
}
}
Configuring the KafkaReceiverRoute in camel-demo-b
Let us assume that the sender route is publishing a JSON message, which we will unmarshal and do some processing.
To the pom.xml
of camel-demo-b
application, add below dependency:
xxxxxxxxxx
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-jackson-starter</artifactId>
<version>3.8.0</version>
</dependency>
Creating the Model class:
xxxxxxxxxx
package com.vignesh.cameldemob.model;
public class Employee {
private int id;
private String name;
public Employee() {
}
public Employee(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public String getName() {
return name;
}
public String toString() {
return "Employee{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
The route is configured to consume the message from the Kafka topic, unmarshal using the Jackson JSON library and do some processing.
xxxxxxxxxx
package com.vignesh.cameldemob.route.b;
import com.vignesh.cameldemob.model.Employee;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
public class KafkaReceiverRoute extends RouteBuilder {
GetEmployee getEmployee;
public void configure() throws Exception {
from("kafka:mytopic")
.unmarshal().json(JsonLibrary.Jackson, Employee.class)
.bean(getEmployee)
.to("log:myloggingqueue");
}
}
class GetEmployee{
Logger logger= LoggerFactory.getLogger(GetEmployee.class);
public void getData(Employee employee){
logger.info("Emp data: "+employee.getId());
}
}
Testing:
Start camel-demo-a
application and place the json file in the input folder:
The file will be read and the message will be published on the Kafka topic.
Start camel-demo-b
application. Observe that the route consumes the message from the Kafka topic, performs json unmarshalling, and further processing.
Opinions expressed by DZone contributors are their own.
Comments