Develop Camel-Quarkus Applications Using Red Hat
In this article, we demonstrate how to publish and consume messages from Kafka Topic using camel-quarkus-Kafka.
Join the DZone community and get the full member experience.
Join For FreeRed Hat Open Shift
Red Hat Open Shift offers a consistent hybrid-cloud foundation for building and scaling containerized applications. Open Shift provides an enterprise-grade, container-based platform with no vendor lock-in. Red Hat was one of the first companies to work with Google on Kubernetes, even before launch, and has become the second leading contributor to the Kubernetes upstream project. Open Shift also provides a common development platform no matter what infrastructure we use to host the application.
Quarkus
Quarkus provides a container-first approach to building Java applications. This approach makes it much easier to build microservices-based applications written in Java as well as enabling those applications to invoke functions running on serverless computing frameworks. For this reason, Quarkus applications have small memory footprints and fast start-up times.
For this demonstration, we chose to run our camel Quarkus apps on open shift. Running on Open Shift Container Platform means that our demo applications can run anywhere that Open Shift runs, which includes bare metal, Amazon Web Services (AWS), Azure, Google Cloud, IBM Cloud and more
Red Hat AMQ Streams
Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka projects.
The main components comprise:
- Kafka Broker
-
Messaging broker responsible for delivering records from producing clients to consuming clients.
Apache ZooKeeper is a core dependency for Kafka, providing a cluster coordination service for highly reliable distributed coordination.
AMQ Streams architecture
Apache Camel
Apache Camel is an open-source framework for message-oriented middleware with a rule-based routing and mediation engine that provides a Java object-based implementation of the Enterprise Integration Patterns using an application programming interface to configure routing and mediation rules
Prerequisites
For this demonstration, you will need the following technologies set up in your development environment:
- An Open Shift 4.3+ environment with Cluster Admin access
- Open shift CLI (
oc
) - Apache Maven 3.6.3+
- JDK 11 Installed
- Kafka cluster installed on openshift 4
In this article, we demonstrate how to publish and consume messages from Kafka Topic using camel-quarkus-Kafka.
Getting Started With Red Hat Quarkus Build
From Red, Hat Quarkus configure your application.
select extensions camel-core,camel-Kafka,camel-timer and generate your application
The pom file
x
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>org.acme</groupId>
<artifactId>camel-quarkus-kafka</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<compiler-plugin.version>3.8.1</compiler-plugin.version>
<maven.compiler.parameters>true</maven.compiler.parameters>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus-plugin.version>1.3.4.Final-redhat-00001</quarkus-plugin.version>
<quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>com.redhat.quarkus</quarkus.platform.group-id>
<quarkus.platform.version>1.3.4.Final-redhat-00001</quarkus.platform.version>
<surefire-plugin.version>2.22.1</surefire-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>${quarkus.platform.artifact-id}</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-timer</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${compiler-plugin.version}</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<systemProperties>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>native</id>
<activation>
<property>
<name>native</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<systemProperties>
<native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
</systemProperties>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<quarkus.package.type>native</quarkus.package.type>
</properties>
</profile>
</profiles>
</project>
Single Node Kafka Cluster Set up in Openshift 4
xxxxxxxxxx
pods
my-cluster-entity-operator-cb446ff8f-5ncfj 3/3 Running 1 5d22h
my-cluster-kafka-0 2/2 Running 0 19h
my-cluster-zookeeper-0 1/1 Running 0 5d22h
strimzi-cluster-operator-86bb9f6ccd-m5nsk 1/1 Running 0 5d22h
services
my-cluster-kafka-0 ClusterIP xxx.yyy.149 <none> 9094/TCP 19h
my-cluster-kafka-bootstrap ClusterIP xxx.yy.216.1 <none> 9091/TCP 5d22h
my-cluster-kafka-brokers ClusterIP None <none> 9091/TCP 5d22h
my-cluster-kafka-external-bootstrap ClusterIP xxx.yy.42.219 <none> 9094/TCP 19h
my-cluster-zookeeper-client ClusterIP xxx.yy.160.224 <none> 2181/TCP 5d22h
my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 5d22h
route
my-cluster-kafka-0 my-cluster-kafka-0-kafka.apps.kakarlatest2.lab.xxx.yyy.com my-cluster-kafka-0 9094 passthrough None
my-cluster-kafka-bootstrap my-cluster-kafka-bootstrap-kafka.apps.kakarlatest2.lab.xxx.yyy.com my-cluster-kafka-external-bootstrap 9094 passthrough None
Follow below steps to extract openshift Kafka cluster certs, as the app needs to connect Kafka cluster set up in openshift 4
To Run the example as an external client, first extract and import the openshift Kafka cluster cert
xxxxxxxxxx
oc extract secret/my-cluster-cluster-ca-cert --keys=ca.crt --to=- > src/main/resources/ca.crt
keytool -keystore src/main/resources/client.truststore.jks -alias CARoot -import -file src/main/resources/ca.crt
keytool -import -trustcacerts -alias root -file src/main/resources/ca.crt -keystore src/main/resources/client.truststore.jks -storepass xxxxxx -noprompt
Camel Kafka Route Builder Class
Write camel Kafka Producer Route to publish messages to Kafka's topic.
xxxxxxxxxx
from("timer://foo?period={{period}}").setBody(constant("post message to kafka cluster topic"))
.to("kafka:{{kafka.topic}}?brokers={{kafka.external.bootstrap.url}}"
+ "&keySerializerClass={{kafka.key.serializer}}&serializerClass={{kafka.value.serializer}}"
+ "&securityProtocol=SSL&sslTruststoreLocation={{truststore}}"
+ "&sslTruststorePassword={{truststore.password}}")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>)
exchange.getIn() .getHeader(KafkaConstants.KAFKA_RECORDMETA);
for (RecordMetadata rd : recordMetaData1) {
LOG.info("producer partition is:" + rd.partition());
LOG.info("producer partition message is:" + rd.toString());
}
}
});
Place holders used in Camel Route is defined in Application.properties file
x
kafka.topic=my-topic
kafka.external.bootstrap.url=my-cluster-kafka-bootstrap-kafka.apps.kakarlatest2.lab.xxx.yyy.com:443
period=10000&repeatCount=5
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
truststore=/home/kkakarla/Downloads/camel-quarkus-kafka/src/main/resources/client.truststore.jks
truststore.password=xxxxxx
Tree structure of camel-quarkus-Kafka
xxxxxxxxxx
[kkakarla camel-quarkus-kafka]$ tree
.
├── mvnw
├── mvnw.cmd
├── pom.xml
├── README.md
├── src
│ ├── main
│ │ ├── docker
│ │ │ ├── Dockerfile.jvm
│ │ │ └── Dockerfile.native
│ │ ├── java
│ │ │ └── org
│ │ │ └── acme
│ │ │ ├── ExampleResource.java
│ │ │ └── KafkaRoute.java
│ │ └── resources
│ │ ├── application.properties
│ │ ├── ca.crt
│ │ ├── client.truststore.jks
│ │ └── META-INF
│ │ └── resources
│ │ └── index.html
│ └── test
│ └── java
│ └── org
│ └── acme
│ ├── ExampleResourceTest.java
│ └── NativeExampleResourceIT.java
└── target
14 directories, 14 files
Run the application in dev mode
xxxxxxxxxx
[kkakarla camel-quarkus-kafka]$ ./mvnw quarkus:dev
[INFO] Scanning for projects...
[INFO]
[INFO] --------------------< org.acme:camel-quarkus-kafka >--------------------
[INFO] Building camel-quarkus-kafka 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- quarkus-maven-plugin:1.3.4.Final-redhat-00001:dev (default-cli) camel-quarkus-kafka ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka/target/classes
[INFO] /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka/src/main/java/org/acme/KafkaRoute.java: /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka/src/main/java/org/acme/KafkaRoute.java uses unchecked or unsafe operations.
[INFO] /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka/src/main/java/org/acme/KafkaRoute.java: Recompile with -Xlint:unchecked for details.
Listening for transport dt_socket at address: 5005
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-07-10 11:39:13,455 INFO [org.apa.cam.sup.LRUCacheFactory] (main) Detected and using LURCacheFactory: camel-caffeine-lrucache
2020-07-10 11:39:13,685 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.1.0 (CamelContext: camel-1) is starting
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka/src/main/resources/client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2020-07-10 11:40:32,475 WARN [org.apa.kaf.cli.pro.KafkaProducer] (Timer-0) [Producer clientId=producer-2] delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms. Setting it to 305000.
2020-07-10 11:40:32,946 INFO [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka version: 2.4.0.redhat-00005
2020-07-10 11:40:32,946 INFO [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka commitId: bc61f1c575849a1e
2020-07-10 11:40:32,946 INFO [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka startTimeMs: 1594361432945
2020-07-10 11:40:32,950 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Route: route2 started and consuming from: timer://foo
2020-07-10 11:40:32,953 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Total 1 routes, of which 1 are started
2020-07-10 11:40:32,953 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Apache Camel 3.1.0 (CamelContext: camel-2) started in 0.488 seconds
2020-07-10 11:40:32,954 INFO [io.quarkus] (Timer-0) camel-quarkus-kafka 1.0.0-SNAPSHOT (powered by Quarkus 1.3.0.Final) started in 81.008s. Listening on: http://0.0.0.0:8080
2020-07-10 11:40:32,955 INFO [io.quarkus] (Timer-0) Profile dev activated. Live Coding activated.
2020-07-10 11:40:32,955 INFO [io.quarkus] (Timer-0) Installed features: [camel-core, camel-file, camel-kafka, camel-log, camel-support-common, camel-timer, cdi, kubernetes, resteasy]
2020-07-10 11:40:32,956 INFO [io.qua.dev] (Timer-0) Hot replace total time: 0.952s
2020-07-10 11:40:35,481 INFO [org.apa.kaf.cli.Metadata] (kafka-producer-network-thread | producer-2) [Producer clientId=producer-2] Cluster ID: P_Ka2jxLRd6xlrVD2jHFjg
2020-07-10 11:40:38,114 INFO [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #2 - KafkaProducer[my-topic]) producer partition is:0
2020-07-10 11:40:38,115 INFO [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #2 - KafkaProducer[my-topic]) producer partition message is:my-topic-0
2020-07-10 11:40:44,345 INFO [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #4 - KafkaProducer[my-topic]) producer partition is:0
2020-07-10 11:40:44,346 INFO [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #4 - KafkaProducer[my-topic]) producer partition message is:my-topic-0
2020-07-10 11:40:54,350 INFO [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #6 - KafkaProducer[my-topic]) producer partition is:0
2020-07-10 11:40:54,351 INFO [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #6 - KafkaProducer[my-topic]) producer partition message is:my-topic-0
2020-07-10 11:41:04,342 INFO [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #8 - KafkaProducer[my-topic]) producer partition is:0
2020-07-10 11:41:04,343 INFO [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #8 - KafkaProducer[my-topic]) producer partition message is:my-topic-0
2020-07-10 11:41:14,567 INFO [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #10 - KafkaProducer[my-topic]) producer partition is:0
2020-07-10 11:41:14,567 INFO [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #10 - KafkaProducer[my-topic]) producer partition message is:my-topic-0
Published 5 messages to partition 0
Write camel Kafka Consumer Route to consume messages from Kafka topic.
xxxxxxxxxx
from("timer://foo?period={{period}}")
.from("kafka:{{kafka.topic}}?brokers={{kafka.external.bootstrap.url}}&securityProtocol=SSL"
+ "&sslTruststoreLocation={{truststore}}&"
+ "sslTruststorePassword={{truststore.password}}&groupId=cameltest")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}");
Run the consumer application in dev mode
x
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka-consumer/src/main/resources/client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2020-07-10 11:52:07,840 WARN [org.apa.kaf.cli.con.ConsumerConfig] (Timer-0) The configuration 'specific.avro.reader' was supplied but isn't a known config.
2020-07-10 11:52:07,841 INFO [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka version: 2.4.0.redhat-00005
2020-07-10 11:52:07,842 INFO [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka commitId: bc61f1c575849a1e
2020-07-10 11:52:07,842 INFO [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka startTimeMs: 1594362127841
2020-07-10 11:52:07,843 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Route: route2 started and consuming from: kafka://my-topic
2020-07-10 11:52:07,844 INFO [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Subscribing my-topic-Thread 0 to topic my-topic
2020-07-10 11:52:07,845 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Total 1 routes, of which 1 are started
2020-07-10 11:52:07,845 INFO [org.apa.kaf.cli.con.KafkaConsumer] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Subscribed to topic(s): my-topic
2020-07-10 11:52:07,845 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Apache Camel 3.1.0 (CamelContext: camel-2) started in 0.454 seconds
2020-07-10 11:52:07,846 INFO [io.quarkus] (Timer-0) camel-quarkus-kafka-consumer 1.0.0-SNAPSHOT (powered by Quarkus 1.3.0.Final) started in 74.773s. Listening on: http://0.0.0.0:8080
2020-07-10 11:52:07,846 INFO [io.quarkus] (Timer-0) Profile dev activated. Live Coding activated.
2020-07-10 11:52:07,846 INFO [io.quarkus] (Timer-0) Installed features: [camel-core, camel-file, camel-kafka, camel-log, camel-support-common, camel-timer, cdi, kubernetes, resteasy]
2020-07-10 11:52:07,847 INFO [io.qua.dev] (Timer-0) Hot replace total time: 0.732s
2020-07-10 11:52:10,540 INFO [org.apa.kaf.cli.Metadata] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Cluster ID: P_Ka2jxLRd6xlrVD2jHFjg
2020-07-10 11:52:10,630 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Discovered group coordinator my-cluster-kafka-0-kafka.apps.kakarlatest2.lab.xxx.yyy.com:443 (id: 2147483647 rack: null)
2020-07-10 11:52:11,153 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] (Re-)joining group
2020-07-10 11:52:13,918 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] (Re-)joining group
2020-07-10 11:52:17,500 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Finished assignment for group at generation 17: {consumer-cameltest-2-386c3314-a3ff-44b3-9612-6378ffc4d64a=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment }
2020-07-10 11:52:17,916 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Successfully joined group with generation 17
2020-07-10 11:52:17,919 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Adding newly assigned partitions: my-topic-0
2020-07-10 11:52:18,526 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Setting offset for partition my-topic-0 to the committed offset FetchPosition{offset=20, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-0-kafka.apps.kakarlatest2.lab.xxx.yyy.com:443 (id: 0 rack: null), epoch=0}}
2020-07-10 11:52:20,929 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Message received from Kafka : post message to kafka cluster topic
2020-07-10 11:52:20,930 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) on the topic my-topic
2020-07-10 11:52:20,930 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) on the partition 0
2020-07-10 11:52:20,930 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) with the offset 20
2020-07-10 11:52:20,930 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) with the key
2020-07-10 11:52:20,931 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Message received from Kafka : post message to kafka cluster topic
2020-07-10 11:52:20,931 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) on the topic my-topic
2020-07-10 11:52:20,931 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) on the partition 0
2020-07-10 11:52:20,932 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) with the offset 21
2020-07-10 11:52:20,932 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) with the key
2020-07-10 11:52:20,932 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Message received from Kafka : post message to kafka cluster topic
2020-07-10 11:52:20,932 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) on the topic my-topic
2020-07-10 11:52:20,932 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) on the partition 0
2020-07-10 11:52:20,932 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) with the offset 22
2020-07-10 11:52:20,932 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) with the key
2020-07-10 11:52:20,932 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Message received from Kafka : post message to kafka cluster topic
2020-07-10 11:52:20,933 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) on the topic my-topic
2020-07-10 11:52:20,933 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) on the partition 0
2020-07-10 11:52:20,933 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) with the offset 23
2020-07-10 11:52:20,933 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) with the key
2020-07-10 11:52:20,933 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Message received from Kafka : post message to kafka cluster topic
2020-07-10 11:52:20,933 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) on the topic my-topic
2020-07-10 11:52:20,933 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) on the partition 0
2020-07-10 11:52:20,933 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) with the offset 24
2020-07-10 11:52:20,933 INFO [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) with the key
Deploy Application to Openshift 4.x Cluster
1. Change to the directory that contains your Quarkus Maven project.
2. To add the Open Shift extension to an existing project, enter the following command:
xxxxxxxxxx
./mvnw quarkus:add-extension -Dextensions="openshift"
3. When you add the OpenShift extension, the script adds the following dependency to the pom.xml
file:
xxxxxxxxxx
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-openshift</artifactId>
</dependency>
4. If you are using an untrusted certificate, add the following line to the src/main/resources/application.properties
file:
xxxxxxxxxx
quarkus.s2i.base-jvm-image=registry.access.redhat.com/openjdk/openjdk-11-rhel7
5. Log in to the Open Shift CLI (oc) and create a new Open Shift project
6. To deploy your project to Open Shift, enter the following command in your Quarkus Maven project directory
xxxxxxxxxx
./mvnw clean package -Dquarkus.kubernetes.deploy=true
Check the pods
xxxxxxxxxx
NAME READY STATUS RESTARTS AGE
camel-quarkus-kafka-1-build 0/1 Completed 0 4m21s
camel-quarkus-kafka-1-deploy 0/1 Completed 0 56s
camel-quarkus-kafka-1-z9r65 1/1 Running 0 82s
Check the camel-quarkus-Kafka-1-z9r65 logs using command oc logs -f camel-quarkus-kafka-1-z9r65
Here Kafka cluster service bootstrap URL should be used in camel route if app is deployed in same openshift where Kafka cluster is set up
Hope this article helps others who want to try camel-quarkus-Kafka
Opinions expressed by DZone contributors are their own.
Comments