Kafka Security With SASL and ACL
Tutorial covering authentication using SCRAM, authorization using Kafka ACL, encryption using SSL, and using camel-Kafka to produce/consume messages.
Join the DZone community and get the full member experience.
Join For FreeRed Hat AMQ Streams
Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka projects.
This blog covers authentication using SCRAM, authorization using Kafka ACL, encryption using SSL, and connect Kafka cluster using camel-Kafka to produce/consume messages with camel routes.
Encryption and Authentication
AMQ Streams supports encryption and authentication, which is configured as part of the listener configuration.
Listener Configuration
Encryption and authentication in Kafka brokers is configured per listener.
Each listener in the Kafka broker is configured with its own security protocol. The configuration property listener.security.protocal defines which listener uses which security protocol. It maps each listener name to its security protocol.
Supported security protocols are
- PLAINTEXT
Listener without any encryption or authentication.
- SSL
Listener using TLS encryption and, optionally, authentication using TLS client certificates.
- SASL_PLAINTEXT
Listener without encryption but with SASL-based authentication.
- SASL_SSL
Listener with TLS-based encryption and SASL-based authentication.
Given the following listener configuration for SASL_SSL:
x
listeners=SASL_SSL://localhost:9092
advertised.listeners=SASL_SSL://localhost:9092
security.inter.broker.protocol=SASL_SSL
TLS Encryption
In order to use TLS encryption and server authentication, a keystore containing private and public keys has to be provided. This is usually done using a file in the Java Key store (JKS) format. A path to this file is set in the ssl.keystore.location property. The ssl.keystore.password
Generate TLS certificates for all Kafka brokers in your cluster. The certificates should have their advertised and bootstrap addresses in their Common Name or Subject Alternative Name.
Edit the /opt/kafka/config/server.properties Kafka configuration file on all cluster nodes for the following:
- Change the listener.security.protocol.map field to specify the SSL protocol for the listener where you want to use TLS encryption.
- Set the ssl.keystore.location option to the path to the JKS keystore with the broker certificate.
- Set the ssl.keystore.password option to the password you used to protect the keystore.
Download Apache Kafka and Start Zookeeper
SASL Authentication
SASL authentication is configured using Java Authentication and Authorization Service (JAAS). JAAS is also used for authentication of connections between Kafka and ZooKeeper. JAAS uses its own configuration file. The recommended location for this file is /opt/kafka/config/jaas.conf
SASL authentication is supported both through plain unencrypted connections as well as through TLS connections. SASL can be enabled individually for each listener. To enable it, the security protocol in listener.security.protocol.map has to be either SASL_PLAINTEXT or SASL_SSL.
SASL authentication in Kafka supports several different mechanisms:
- PLAIN
Implements authentication based on username and passwords. Usernames and passwords are stored locally in Kafka configuration.
- SCRAM-SHA-256 and SCRAM-SHA-512
Implements authentication using Salted Challenge Response Authentication Mechanism (SCRAM). SCRAM credentials are stored centrally in ZooKeeper. SCRAM can be used in situations where ZooKeeper cluster nodes are running isolated in a private network.
- GSSAPI
Implements authentication against a Kerberos server
The SASL mechanisms are configured via the JAAS configuration file. Kafka uses the JAAS context named Kafka server. After they are configured in JAAS, the SASL mechanisms have to be enabled in the Kafka configuration. This is done using the sasl.enabled.mechanisms property
- SASL SCRAM
SCRAM authentication in Kafka consists of two mechanisms: SCRAM-SHA-256
and SCRAM-SHA-512
. These mechanisms differ only in the hashing algorithm used - SHA-256 versus stronger SHA-512. To enable SCRAM authentication, the JAAS configuration file has to include the following configuration:
Sample ${kafka-home}/config/kafka_server_jass.conf file
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin123";
};
And in server.properties file enable SASL authentication
xxxxxxxxxx
# With SASL & SSL encryption
scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin123";
Create ssl-user-config.properties in kafka-home/config
xxxxxxxxxx
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ramu" password="ramu123";
ssl.truststore.location=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/truststore/kafka.truststore.jks
ssl.truststore.password=kkr123
User credentials for the SCRAM mechanism are stored in ZooKeeper. The kafka-configs.sh
tool can be used to manage them
xxxxxxxxxx
./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-512=[password='admin123']' --entity-type users --entity-name admin
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'admin'.
./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-512=[password='ramu123']' --entity-type users --entity-name ramu
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'ramu'.
complete ${kafka-home}/config/server.properties file looks like below
xxxxxxxxxx
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=SASL_SSL://localhost:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=SASL_SSL://localhost:9092
security.inter.broker.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=
ssl.client.auth=required
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
# Broker security settings
ssl.truststore.location=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/truststore/kafka.truststore.jks
ssl.truststore.password=kkr123
ssl.keystore.location=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/keystore/kafka.keystore.jks
ssl.keystore.password=kkr123
ssl.key.password=kkr123
# ACLs
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
# #zookeeper SASL
zookeeper.set.acl=false
# # With SASL & SSL encryption
scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin123";
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
Start Kafka with JAAS
xxxxxxxxxx
export KAFKA_OPTS=-Djava.security.auth.login.config=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/kafka_server_jaas.conf
./bin/kafka-server-start.sh /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/server.properties
Create Topic test-topic
xxxxxxxxxx
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --command-config ./config/ssl-user-config.properties --replication-factor 1 --partitions 1 --topic test-topic
The above command will fails as it do not have create permissions
Now assign permissions
xxxxxxxxxx
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ramu --operation Create --operation Describe --topic test-topic
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`:
(principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:ramu, host=*, operation=CREATE, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`:
(principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:ramu, host=*, operation=CREATE, permissionType=ALLOW)
Similarly give permissions to producer and consumer also
Producer
xxxxxxxxxx
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ramu --producer --topic test-topic
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`:
(principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:ramu, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:ramu, host=*, operation=CREATE, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`:
(principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:ramu, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:ramu, host=*, operation=WRITE, permissionType=ALLOW)
Now produce some msgs
xxxxxxxxxx
[kkakarla kafka_2.12-2.5.0.redhat-00003]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config config/ssl-producer.properties
>Hi this is october
>
ssl-producer.properties file
xxxxxxxxxx
bootstrap.servers=localhost:9092
compression.type=none
### SECURITY ######
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ramu" password="ramu123";
ssl.truststore.location=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/truststore/kafka.truststore.jks
ssl.truststore.password=kkr123
Consumer
xxxxxxxxxx
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ramu --consumer --topic test-topic --group test-consumer-group
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`:
(principal=User:ramu, host=*, operation=READ, permissionType=ALLOW)
(principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=demo-consumer-group, patternType=LITERAL)`:
(principal=User:ramu, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`:
(principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:ramu, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:ramu, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:ramu, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=demo-consumer-group, patternType=LITERAL)`:
(principal=User:ramu, host=*, operation=READ, permissionType=ALLOW)
Now consume msgs
xxxxxxxxxx
[kkakarla kafka_2.12-2.5.0.redhat-00003]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config config/ssl-consumer.properties
Hi this is october
ssl-consumer.properties
xxxxxxxxxx
bootstrap.servers=localhost:9092
# consumer group id
group.id=test-consumer-group
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=
### SECURITY ######
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ramu" password="ramu123";
ssl.truststore.location=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/truststore/kafka.truststore.jks
ssl.truststore.password=kkr123
Now from spring-boot application using camel producer/consumer
xxxxxxxxxx
public class KafkaRouteBuilder extends RouteBuilder {
public void configure() throws Exception {
//producer
from("timer://foo?period={{period}}")
.setBody(constant("Hi This is kafka example"))
.to("kafka:{{kafka.topic}}?brokers={{kafka.bootstrap.url}}"
+ "&keySerializerClass=org.apache.kafka.common.serialization.StringSerializer"
+ "&serializerClass=org.apache.kafka.common.serialization.StringSerializer"
+ "&securityProtocol={{security.protocol}}&saslJaasConfig={{sasl.jaas.config}}"
+ "&saslMechanism={{sasl.mechanism}}&sslTruststoreLocation={{ssl.truststore.location}}"
+ "&sslTruststorePassword={{ssl.truststore.password}}&sslTruststoreType={{ssl.truststore.type}}")
.log("${body}");
//consumer
from("kafka:{{consumer.topic}}?brokers={{kafka.bootstrap.url}}&maxPollRecords={{consumer.max.poll.records}}"
+ "&keySerializerClass=org.apache.kafka.common.serialization.StringSerializer"
+ "&serializerClass=org.apache.kafka.common.serialization.StringSerializer"
+ "&groupId={{consumer.group}}&securityProtocol={{security.protocol}}&saslJaasConfig={{sasl.jaas.config}}"
+ "&saslMechanism={{sasl.mechanism}}&sslTruststoreLocation={{ssl.truststore.location}}"
+ "&sslTruststorePassword={{ssl.truststore.password}}&sslTruststoreType={{ssl.truststore.type}}"
+ "&autoOffsetReset={{consumer.auto.offset.reset}}&autoCommitEnable={{consumer.auto.commit.enable}}")
.log("${body}");
}
Application.properties file
xxxxxxxxxx
kafka.topic=test-topic
kafka.bootstrap.url=localhost:9092
period=10000&repeatCount=5
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
consumer.topic=test-topic
consumer.group=test-consumer-group
consumer.max.poll.records=1
consumer.threads=10
consumer.consumersCount=1
consumer.auto.offset.reset=earliest
consumer.auto.commit.enable=true
consumer.receive.buffer.bytes=-1
security.protocol = SASL_SSL
ssl.truststore.location =/home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/src/main/truststore/kafka.truststore.jks
ssl.truststore.password = kkr123
ssl.truststore.type = JKS
sasl.mechanism = SCRAM-SHA-512
#sasl.kerberos.service.name=
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required username="ramu" password="ramu123";
Test
xxxxxxxxxx
[kkakarla camel-example-kafka-sasl_ssl]$ mvn spring-boot:run
[INFO] Scanning for projects...
[INFO]
[INFO] ---------< org.apache.camel.example:camel-example-kafka-sasl >----------
[INFO] Building Camel :: Example :: Kafka :: sasl 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] >>> spring-boot-maven-plugin:2.2.5.RELEASE:run (default-cli) > test-compile camel-example-kafka-sasl >>>
[INFO]
[INFO] --- build-helper-maven-plugin:3.0.0:add-source (add-source) camel-example-kafka-sasl ---
[INFO] Source directory: /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/target/generated-sources/sasl added.
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) camel-example-kafka-sasl ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) camel-example-kafka-sasl ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) camel-example-kafka-sasl ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) camel-example-kafka-sasl ---
[INFO] No sources to compile
[INFO]
[INFO] <<< spring-boot-maven-plugin:2.2.5.RELEASE:run (default-cli) < test-compile camel-example-kafka-sasl <<<
[INFO]
[INFO]
[INFO] --- spring-boot-maven-plugin:2.2.5.RELEASE:run (default-cli) camel-example-kafka-sasl ---
[INFO] Attaching agents: []
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.5.RELEASE)
2020-10-02 13:12:13.520 INFO 13586 --- [ main] o.a.c.e.kafka.sasl.ssl.Application : Starting Application on kkakarla.pnq.csb with PID 13586 (/home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/target/classes started by kkakarla in /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl)
2020-10-02 13:12:13.522 INFO 13586 --- [ main] o.a.c.e.kafka.sasl.ssl.Application : No active profile set, falling back to default profiles: default
2020-10-02 13:12:14.518 INFO 13586 --- [ main] o.apache.camel.support.LRUCacheFactory : Detected and using LRUCacheFactory: camel-caffeine-lrucache
2020-10-02 13:12:14.689 INFO 13586 --- [ main] o.a.c.s.boot.SpringBootRoutesCollector : Loading additional Camel XML routes from: classpath:camel/*.xml
2020-10-02 13:12:14.689 INFO 13586 --- [ main] o.a.c.s.boot.SpringBootRoutesCollector : Loading additional Camel XML route templates from: classpath:camel-template/*.xml
2020-10-02 13:12:14.689 INFO 13586 --- [ main] o.a.c.s.boot.SpringBootRoutesCollector : Loading additional Camel XML rests from: classpath:camel-rest/*.xml
2020-10-02 13:12:14.772 INFO 13586 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 3.5.0 (camel) is starting
2020-10-02 13:12:14.775 INFO 13586 --- [ main] o.a.c.impl.engine.AbstractCamelContext : StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2020-10-02 13:12:14.775 INFO 13586 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Using HealthCheck: camel-health
2020-10-02 13:12:14.792 INFO 13586 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-512
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/src/main/truststore/kafka.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2020-10-02 13:12:14.918 INFO 13586 --- [ main] o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
2020-10-02 13:12:14.986 INFO 13586 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
2020-10-02 13:12:14.986 INFO 13586 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
2020-10-02 13:12:14.986 INFO 13586 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1601624534985
2020-10-02 13:12:14.991 INFO 13586 --- [ main] o.a.c.i.e.InternalRouteStartupManager : Route: route1 started and consuming from: timer://foo
2020-10-02 13:12:14.991 INFO 13586 --- [ main] o.a.camel.component.kafka.KafkaConsumer : Starting Kafka consumer on topic: test-topic with breakOnFirstError: false
2020-10-02 13:12:14.996 INFO 13586 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-consumer-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-512
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/src/main/truststore/kafka.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2020-10-02 13:12:15.016 WARN 13586 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'specific.avro.reader' was supplied but isn't a known config.
2020-10-02 13:12:15.016 INFO 13586 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
2020-10-02 13:12:15.016 INFO 13586 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
2020-10-02 13:12:15.016 INFO 13586 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1601624535016
2020-10-02 13:12:15.017 INFO 13586 --- [ main] o.a.c.i.e.InternalRouteStartupManager : Route: route2 started and consuming from: kafka://test-topic
2020-10-02 13:12:15.017 INFO 13586 --- [mer[test-topic]] o.a.camel.component.kafka.KafkaConsumer : Subscribing test-topic-Thread 0 to topic test-topic
2020-10-02 13:12:15.018 INFO 13586 --- [mer[test-topic]] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Subscribed to topic(s): test-topic
2020-10-02 13:12:15.020 INFO 13586 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Total 2 routes, of which 2 are started
2020-10-02 13:12:15.021 INFO 13586 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 3.5.0 (camel) started in 0.246 seconds
2020-10-02 13:12:15.030 INFO 13586 --- [ main] o.a.c.e.kafka.sasl.ssl.Application : Started Application in 1.721 seconds (JVM running for 1.985)
2020-10-02 13:12:15.034 INFO 13586 --- [extShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 3.5.0 (camel) is shutting down
2020-10-02 13:12:15.035 INFO 13586 --- [extShutdownHook] o.a.c.i.engine.DefaultShutdownStrategy : Starting to graceful shutdown 2 routes (timeout 45 seconds)
2020-10-02 13:12:15.036 INFO 13586 --- [ - ShutdownTask] o.a.camel.component.kafka.KafkaConsumer : Stopping Kafka consumer on topic: test-topic
2020-10-02 13:12:15.315 INFO 13586 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: TIW2NTETQmeyjTIzNCKdIg
2020-10-02 13:12:15.318 INFO 13586 --- [mer[test-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Cluster ID: TIW2NTETQmeyjTIzNCKdIg
2020-10-02 13:12:15.319 INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2020-10-02 13:12:15.321 INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] (Re-)joining group
2020-10-02 13:12:15.390 INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-10-02 13:12:15.390 INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] (Re-)joining group
2020-10-02 13:12:15.394 INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Finished assignment for group at generation 16: {consumer-test-consumer-group-1-6f265a6e-422f-4651-b442-a48638bcc2ee=Assignment(partitions=[test-topic-0])}
2020-10-02 13:12:15.398 INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Successfully joined group with generation 16
2020-10-02 13:12:15.401 INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Adding newly assigned partitions: test-topic-0
2020-10-02 13:12:15.411 INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Setting offset for partition test-topic-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2020-10-02 13:12:16.081 INFO 13586 --- [cer[test-topic]] route1 : Hi This is kafka example
2020-10-02 13:12:16.082 INFO 13586 --- [mer[test-topic]] route2 : Hi This is kafka example
Enjoy! See you with another article soon.
Opinions expressed by DZone contributors are their own.
Comments