Running Kafka in Kubernetes With Kraft Mode and SSL
In this article, learn how to launch an Apache Kafka with the Apache Kafka Raft (KRaft) consensus protocol and SSL encryption.
Join the DZone community and get the full member experience.
Join For FreePrerequisites
An understanding of Apache Kafka, Kubernetes, and Minikube.
The following steps were initially taken on a MacBook Pro with 32GB memory running MacOS Ventura v13.4.
Make sure to have the following applications installed:
- Docker v23.0.5
- Minikube v1.29.0 (running K8s v1.26.1 internally)
It's possible the steps below will work with different versions of the above tools, but if you run into unexpected issues, you'll want to ensure you have identical versions. Minikube was chosen for this exercise due to its focus on local development.
Deployment Components
Server Keys and Certificates
The first step to enable SSL encryption is to a create public/private key pair for every server.
⚠️ The commands in this section were executed in a Docker container running the image openjdk:11.0.10-jre
because it's the same Java version (Java 11) that Confluent runs. With this approach, any possible Java version-related issue is prevented.
The next commands were executed following the Confluent Security Tutorial:
docker run -it --rm \
--name openjdk \
--mount source=kafka-certs,target=/app \
openjdk:11.0.10-jre
Once in the Docker container:
keytool -keystore kafka-0.server.keystore.jks -alias kafka-0 -keyalg RSA -genkey
Output:
Enter keystore password: Re-enter new password: What is your first and last name? [Unknown]: kafka-0.kafka-headless.kafka.svc.cluster.local What is the name of your organizational unit? [Unknown]: test What is the name of your organization? [Unknown]: test What is the name of your City or Locality? [Unknown]: Liverpool What is the name of your State or Province? [Unknown]: Merseyside What is the two-letter country code for this unit? [Unknown]: UK Is CN=kafka-0.kafka-headless.kafka.svc.cluster.local, OU=test, O=test, L=Liverpool, ST=Merseyside, C=UK correct? [no]: yes
Repeating the command for each broker:
keytool -keystore kafka-1.server.keystore.jks -alias kafka-1 -keyalg RSA -genkey
keytool -keystore kafka-2.server.keystore.jks -alias kafka-2 -keyalg RSA -genkey
Create Your Own Certificate Authority (CA)
Generate a CA that is simply a public-private key pair and certificate, and it is intended to sign other certificates.
openssl req -new -x509 -keyout ca-key -out ca-cert -days 90
Output:
Generating a RSA private key ...+++++ ........+++++ writing new private key to 'ca-key' Enter PEM pass phrase: Verifying - Enter PEM pass phrase: ----- You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value, If you enter '.', the field will be left blank. ----- Country Name (2 letter code) [AU]:UK State or Province Name (full name) [Some-State]:Merseyside Locality Name (eg, city) []:Liverpool Organization Name (eg, company) [Internet Widgits Pty Ltd]:test Organizational Unit Name (eg, section) []:test Common Name (e.g. server FQDN or YOUR name) []:*.kafka-headless.kafka.svc.cluster.local Email Address []:
Add the generated CA to the clients’ truststore so that the clients can trust this CA:
keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
Add the generated CA to the brokers’ truststore so that the brokers can trust this CA.
keytool -keystore kafka-0.server.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka-1.server.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka-2.server.truststore.jks -alias CARoot -importcert -file ca-cert
Sign the Certificate
To sign all certificates in the keystore with the CA that you generated:
Export the certificate from the keystore:
Shellkeytool -keystore kafka-0.server.keystore.jks -alias kafka-0 -certreq -file cert-file-kafka-0 keytool -keystore kafka-1.server.keystore.jks -alias kafka-1 -certreq -file cert-file-kafka-1 keytool -keystore kafka-2.server.keystore.jks -alias kafka-2 -certreq -file cert-file-kafka-2
- Sign it with the CA:
Shell
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-kafka-0 -out cert-signed-kafka-0 -days 90 -CAcreateserial -passin pass:${ca-password} openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-kafka-1 -out cert-signed-kafka-1 -days 90 -CAcreateserial -passin pass:${ca-password} openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-kafka-2 -out cert-signed-kafka-2 -days 90 -CAcreateserial -passin pass:${ca-password}
⚠️ Don't forget to substitute
${ca-password}
Import both the certificate of the CA and the signed certificate into the broker keystore:
Shellkeytool -keystore kafka-0.server.keystore.jks -alias CARoot -importcert -file ca-cert keytool -keystore kafka-0.server.keystore.jks -alias kafka-0 -importcert -file cert-signed-kafka-0 keytool -keystore kafka-1.server.keystore.jks -alias CARoot -importcert -file ca-cert keytool -keystore kafka-1.server.keystore.jks -alias kafka-1 -importcert -file cert-signed-kafka-1 keytool -keystore kafka-2.server.keystore.jks -alias CARoot -importcert -file ca-cert keytool -keystore kafka-2.server.keystore.jks -alias kafka-2 -importcert -file cert-signed-kafka-2
⚠️ The
keystore
andtruststore
files will be used to create the ConfigMap for our deployment.
ConfigMaps
Create two ConfigMaps, one for the Kafka Broker and another one for our Kafka Client.
Kafka Broker
Create a local folder kafka-ssl
and copy the keystore
and truststore
files into the folder. In addition, create a file broker_creds
with the ${ca-password}
.
Your folder should look similar to this:
ls kafka-ssl
broker_creds
kafka-0.server.truststore.jks kafka-1.server.truststore.jks kafka-2.server.truststore.jks
kafka-0.server.keystore.jks kafka-1.server.keystore.jks kafka-2.server.keystore.jks
Create the ConfigMap:
kubectl create configmap kafka-ssl --from-file kafka-ssl -n kafka
kubectl describe configmaps -n kafka kafka-ssl
Output:
Name: kafka-ssl
Namespace: kafka
Labels: <none>
Annotations: <none>
Data
====
broker_creds:
----
<redacted>
BinaryData
====
kafka-0.server.keystore.jks: 5001 bytes
kafka-0.server.truststore.jks: 1306 bytes
kafka-1.server.keystore.jks: 5001 bytes
kafka-1.server.truststore.jks: 1306 bytes
kafka-2.server.keystore.jks: 5001 bytes
kafka-2.server.truststore.jks: 1306 bytes
Events: <none>
Kafka Client
Create a local folder kafka-client
and copy the kafka.client.truststore.jks
file into the folder. In addition, create a file broker_creds
with the ${ca-password}
and a file client_security.properties
.
#client_security.properties
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=<redacted>
Your folder should look similar to this:
ls kafka-client
broker_creds client_security.properties kafka.client.truststore.jks
Create the ConfigMap:
kubectl create configmap kafka-client --from-file kafka-client -n kafka
kubectl describe configmaps -n kafka kafka-client
Output:
Name: kafka-client
Namespace: kafka
Labels: <none>
Annotations: <none>
Data
====
broker_creds:
----
<redacted>
client_security.properties:
----
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.endpoint.identification.algorithm=
BinaryData
====
kafka.client.truststore.jks: 1306 bytes
Events: <none>
Confluent Kafka
This yaml file deploys a Kafka cluster within a Kubernetes namespace named kafka
. It defines various Kubernetes resources required for setting up Kafka in a distributed manner.
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: kafka
namespace: kafka
---
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka
name: kafka-headless
namespace: kafka
spec:
clusterIP: None
clusterIPs:
- None
internalTrafficPolicy: Cluster
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
ports:
- name: tcp-kafka-int
port: 9092
protocol: TCP
targetPort: tcp-kafka-int
- name: tcp-kafka-ssl
port: 9093
protocol: TCP
targetPort: tcp-kafka-ssl
selector:
app: kafka
sessionAffinity: None
type: ClusterIP
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app: kafka
name: kafka
namespace: kafka
spec:
podManagementPolicy: Parallel
replicas: 3
revisionHistoryLimit: 10
selector:
matchLabels:
app: kafka
serviceName: kafka-headless
template:
metadata:
labels:
app: kafka
spec:
serviceAccountName: kafka
containers:
- command:
- sh
- -exc
- |
export KAFKA_NODE_ID=${HOSTNAME##*-} && \
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_NAME}.kafka-headless.kafka.svc.cluster.local:9092,SSL://${POD_NAME}.kafka-headless.kafka.svc.cluster.local:9093
export KAFKA_SSL_TRUSTSTORE_FILENAME=${POD_NAME}.server.truststore.jks
export KAFKA_SSL_KEYSTORE_FILENAME=${POD_NAME}.server.keystore.jks
export KAFKA_OPTS="-Djavax.net.debug=all"
exec /etc/confluent/docker/run
env:
- name: KAFKA_SSL_KEY_CREDENTIALS
value: "broker_creds"
- name: KAFKA_SSL_KEYSTORE_CREDENTIALS
value: "broker_creds"
- name: KAFKA_SSL_TRUSTSTORE_CREDENTIALS
value: "broker_creds"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL"
- name: CLUSTER_ID
value: "6PMpHYL9QkeyXRj9Nrp4KA"
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: "0@kafka-0.kafka-headless.kafka.svc.cluster.local:29093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:29093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:29093"
- name: KAFKA_PROCESS_ROLES
value: "broker,controller"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_NUM_PARTITIONS
value: "3"
- name: KAFKA_DEFAULT_REPLICATION_FACTOR
value: "3"
- name: KAFKA_MIN_INSYNC_REPLICAS
value: "2"
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: "CONTROLLER"
- name: KAFKA_LISTENERS
value: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093,SSL://0.0.0.0:9093
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
name: kafka
image: docker.io/confluentinc/cp-kafka:7.5.0
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 6
initialDelaySeconds: 60
periodSeconds: 60
successThreshold: 1
tcpSocket:
port: tcp-kafka-int
timeoutSeconds: 5
ports:
- containerPort: 9092
name: tcp-kafka-int
protocol: TCP
- containerPort: 29093
name: tcp-kafka-ctrl
protocol: TCP
- containerPort: 9093
name: tcp-kafka-ssl
protocol: TCP
resources:
limits:
cpu: "1"
memory: 1400Mi
requests:
cpu: 250m
memory: 512Mi
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
runAsGroup: 1000
runAsUser: 1000
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /etc/kafka/secrets/
name: kafka-ssl
- mountPath: /etc/kafka
name: config
- mountPath: /var/lib/kafka/data
name: data
- mountPath: /var/log
name: logs
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext:
fsGroup: 1000
terminationGracePeriodSeconds: 30
volumes:
- emptyDir: {}
name: config
- emptyDir: {}
name: logs
- name: kafka-ssl
configMap:
name: kafka-ssl
updateStrategy:
type: RollingUpdate
volumeClaimTemplates:
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: standard
volumeMode: Filesystem
status:
phase: Pending
The deployment we will create will have the following components:
- Namespace: kafka This is the namespace within which all components will be scoped.
- Service Account: kafka Service accounts are used to control permissions and access to resources within the cluster.
- Headless Service: kafka-headless It exposes ports 9092 (for PLAINTEXT communication) and 9093 (for SSL traffic).
- StatefulSet: kafka It manages Kafka pods and ensures they have stable hostnames and storage.
The source code for this deployment can be found in this GitHub repository.
Specifically for the SSL configurations, the next parameters were configured in the StatefulSet:
- Configure the truststore, keystore, and password:
KAFKA_SSL_KEY_CREDENTIALS KAFKA_SSL_KEYSTORE_CREDENTIALS KAFKA_SSL_TRUSTSTORE_CREDENTIALS
- Configure the ports for the Kafka brokers to listen for SSL:
KAFKA_ADVERTISED_LISTENERS KAFKA_LISTENER_SECURITY_PROTOCOL_MAP KAFKA_LISTENERS
Creating the Deployment
Clone the repo:git clone https://github.com/rafaelmnatali/kafka-k8s.git cd ssl
Deploy Kafka using the following commands:
kubectl apply -f 00-namespace.yaml kubectl apply -f 01-kafka-local.yaml
Verify Communication Across Brokers
There should now be three Kafka brokers each running on separate pods within your cluster. Name resolution for the headless service and the three pods within the StatefulSet is automatically configured by Kubernetes as they are created,allowing for communication across brokers. See the related documentation for more details on this feature.
You can check the first pod's logs with the following command:
kubectl logs kafka-0
The name resolution of the three pods can take more time to work than it takes the pods to start, so you may see UnknownHostException warnings in the pod logs initially:
WARN [RaftManager nodeId=2] Error connecting to node kafka-1.kafka-headless.kafka.svc.cluster.local:29093 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient) java.net.UnknownHostException: kafka-1.kafka-headless.kafka.svc.cluster.local ...
But eventually each pod will successfully resolve pod hostnames and end with a message stating the broker has been unfenced:
INFO [Controller 0] Unfenced broker: UnfenceBrokerRecord(id=1, epoch=176) (org.apache.kafka.controller.ClusterControlManager)
Create a Topic Using the SSL Endpoint
The Kafka StatefulSet should now be up and running successfully. Now we can create a topic using the SSL endpoint.
You can deploy Kafka Client using the following command:
kubectl apply -f 02-kafka-client.yaml
Check if the Pod is Running:
kubectl get pods
Output:
NAME READY STATUS RESTARTS AGE kafka-cli 1/1 Running 0 12m
Connect to the pod kafka-cli:
kubectl exec -it kafka-cli -- bash
Create a topic named test-ssl with three partitions and a replication factor of 3.
kafka-topics --create --topic test-ssl --partitions 3 --replication-factor 3 --bootstrap-server ${BOOTSTRAP_SERVER} --command-config /etc/kafka/secrets/client_security.properties Created topic test-ssl.
The environment variable BOOTSTRAP_SERVER
contains the list of the brokers, therefore, we save time in typing.
List all the topics in Kafka:
kafka-topics --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9093 --list --command-config /etc/kafka/secrets/client_security.properties test test-ssl test-test
Summary and Next Steps
Published at DZone with permission of Rafael Natali. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments