Transmitting Log Messages to a Kafka Topic Using Kafka Log4j2 Appender
In this article, learn more about Log4j, a popular logging package for Java development that offers a simple, comprehensive, and configurable logging system.
Join the DZone community and get the full member experience.
Join For FreeContext
It could be a basic "Hello, World!" application or a complex banking solution like Stripe, but developing an application is a fascinating process. This process typically includes extensive testing and quality assurance to ensure that not only the requirements are met but also that the application is reliable enough for users to consume.
While this procedure is thorough, faults and errors do occur over time as users interact with the program, and the application must be quickly troubleshot to remedy these bugs or mistakes.
Logs are generated by logging a collection of events that occur with the program, and they provide a way to identify which element of the application process is not operating as planned. Log4j is a popular logging package for Java development that offers a simple, comprehensive, and configurable logging system.
Kafka is the most popular open-source stream-processing program for gathering, processing, storing, and analyzing data at scale. It can handle hundreds of messages per second and is noted for its exceptional performance, low latency, fault tolerance, and high throughput.
Prerequisites
Java 8 installed
Docker installed
Basic understanding of Log4j
Basic understanding of Kafka
Steps
- Create a topic for Log4j to subscribe to and produce log messages by creating a docker-compose file with the snippet below. Then, run docker-compose up -d to spin up Kafka and ZooKeeper containers.
version: '2.1'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.1.1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# reachable on 9092 from the host and on 29092 from inside docker compose
kafka:
image: confluentinc/cp-kafka:6.1.1
depends_on:
- zookeeper
ports:
- '9092:9092'
expose:
- '29092'
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_MIN_INSYNC_REPLICAS: '1'
init-kafka:
image: confluentinc/cp-kafka:6.1.1
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic my-topic-1 --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
- Create a Spring Boot application with Maven, add kafka-log4j-appender,
kafka
as a dependency, and exclude Spring Boot default logging properties. The POM file should be like this:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- Create a
HelloWorld
class that simply logs“Hello World sent to Kafka topic at”
at a particular interval using the Log4j logger and invoke thecreateLogs()
method in the main class. This will serve as a producer who will publish the log messages to the Kafka topic created with the docker-compose file.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class HelloWorld {
private static final Logger logger = LoggerFactory.getLogger(HelloWorld.class);
public static void createLogs() throws InterruptedException {
int k = 0;
boolean b = true;
while (b) {
logger.info("Hello World sent to Kafka topic at " +
DateTimeFormatter.ofPattern("HH:mm:ss").format(LocalDateTime.now()));
k++;
b = k < Integer.MAX_VALUE;
Thread.sleep(1000);
}
}
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Log4jAppenderApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Log4jAppenderApplication.class, args);
HelloWorld helloWorld = new HelloWorld();
HelloWorld.createLogs();
}
}
- The next step is to configure Log4j with the log4j-kafka-appender.
Appenders
are in charge of delivering LogEvents to their destinations. TheKafka
topic is the destination in this situation. The logging level is set to warn in order to avoid repetitive logging. Because the appender is synchronous by default and will block until acknowledged by the Kafka server, it is wrapped with anAsync
appender to enable asynchronous logging. TheAsyncAppender
accepts pointers to other appenders and writes LogEvents to them on a different thread.
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info" name="spring-boot-kafka-log" packages="com.reloadly">
<Appenders>
<Kafka name="Kafka" topic="my-topic-1">
<PatternLayout pattern="%date %message"/>
<Property name="bootstrap.servers">localhost:9092</Property>
</Kafka>
<Async name="Async">
<AppenderRef ref="Kafka"/>
</Async>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5p [%-7t] %F:%L - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Kafka"/>
<AppenderRef ref="stdout"/>
</Root>
<Logger name="org.apache.kafka" level="WARN" /><!-- avoid recursive logging -->
</Loggers>
</Configuration>
- Next, we'll process the logs that our simple Java application sends to the Kafka cluster. To do so, we'll construct a simple consumer class whose sole purpose is to listen to the
Kafka
topic where the logs are being streamed and process them. Simply print out the log messages on the console in this case.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "test-group");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<> (properties);
List<String> topics = new ArrayList<>();
topics.add("my-topic-1");
kafkaConsumer.subscribe(topics);
try{
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMinutes(20));
for (ConsumerRecord<String, String> message: records){
System.out.println(String.format("Topic - %s, Partition - %d, Value: %s", message.topic(), message.partition(), message.value()));
}
}
}catch (Exception e){
System.out.println(e.getMessage());
}finally {
kafkaConsumer.close();
}
}
}
- Ensure that the Kafka container is up. Then, run Log4jAppenderApplication.Java and Consumer. Java.
You can find the project on GitHub.
Opinions expressed by DZone contributors are their own.
Comments