Live Dashboard Using Apache Kafka and Spring WebSocket
Want to learn more about using Apache Kafka and Spring WebSocket? Check out this tutorial on how to create a live dashboard of real-time temperature values.
Join the DZone community and get the full member experience.
Join For FreeThis article demonstrates a demo project that I have been working on over the last week. This project uses the Live web app dashboard using Apache Kafka and Spring WebSocket. I have set the topic as the "Live Temperature Update." Kafka will feed the real-time temperature values, and the application will read the temperature in real-time and update the view.
Apache Kafka
Apache Kafka is a pub-sub messaging stream that can be used to create an enterprise messaging queue.
Spring Support for Apache Kafka
The Spring framework is providing support for Apache Kafka to publish and subscribe data in real-time.
Spring WebSocket
The Spring framework is also hosting a project called Spring WebSocket, which can be used to send messages back and forth between the client and server in real-time. To do this, Spring WebSocket is using the STOMP protocol.
My Demo Project
I have created a simple demo application to explore these technologies. In this application, the view page has a simple text and a simple line chart (I am using chart.js
), which is updating in real-time.
In this article, I am not including the process of configuring the Kafka in our workstation. There are plenty of articles that can be found on the Internet for the same.
Maven Dependencies Needed
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>webjars-locator-core</artifactId>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>sockjs-client</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>stomp-websocket</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>bootstrap</artifactId>
<version>3.3.7</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>jquery</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>2.0.4.RELEASE</version>
</dependency>
</dependencies>
Kafka Consumer Configuration in Spring
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrapserver}")
public String bootstrapServer;
@Bean
public Map<String,Object> consumerConfigs(){
Map<String,Object> props=new HashMap<String,Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "temp-groupid.group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory=new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
In Config, you will need to perform the following tasks:
BOOTSTRAP_SERVERS_CONFIG
to mention the server address on which Kafka is running.
KEY_DESERIALIZER_CLASS_CONFIG
and VALUE_DESERIALIZER_CLASS_CONFIG
to deserialize the key and value from the Kafka Queue.
GROUP_ID_CONFIG
to mention the Kafka group ID.
Use the AUTO_OFFSET_RESET_CONFIG
to mention the Offset Configuration. In this project, we are using the value "latest" so that we will get only the latest value. Instead, we can also use "earliest" to get all the values in the queue from the beginning.
Spring WebSocket Configuration
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/live-temperature").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
}
}
Next, we need to implement the WebSocketMessageBrokerConfigurer
to configure the WebSocket.
Kafka Consumer and Message Publisher for WebSocket
@Service
public class KafkaConsumerService{
@Autowired
SimpMessagingTemplate template;
@KafkaListener(topics="${kafka.topic}")
public void consume(@Payload String message) {
if(isNumeric(message)) {
template.convertAndSend("/topic/temperature", message);
}
}
public boolean isNumeric(String str)
{
try
{
@SuppressWarnings("unused")
double d = Double.parseDouble(str);
}
catch(NumberFormatException nfe)
{
return false;
}
return true;
}
}
In this class, the @KafkaListener
annotated the method that will listen for the Kafka queue messages, and template.convertAndSend
will convert the message and send that to WebSocket.
View Page
<!DOCTYPE html>
<html>
<head>
<meta charset="ISO-8859-1">
<title>Home</title>
<link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
<script src="/webjars/jquery/jquery.min.js"></script>
<script src="/webjars/sockjs-client/sockjs.min.js"></script>
<script src="/webjars/stomp-websocket/stomp.min.js"></script>
<script
src="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.22.2/moment.min.js"></script>
<script
src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.7.2/Chart.min.js"></script>
<script type="text/javascript">
var stompClient;
/* Chart Configuration */
var config = {
type : 'line',
data : {
labels : [],
datasets : [ {
label : 'Temperature',
backgroudColor : 'rgb(255, 99, 132)',
borderColor : 'rgb(255, 99, 132)',
data : [],
fill : false
} ]
},
options : {
responsive : true,
title : {
display : true,
text : 'Temperature'
},
tooltips : {
mode : 'index',
intersect : false
},
hover : {
mode : 'nearest',
intersect : true
},
scales : {
xAxes : [ {
display : true,
type : 'time',
time : {
displayFormats : {
quarter : 'h:mm:ss a'
}
},
scaleLabel : {
display : true,
labelString : 'Time'
}
} ],
yAxes : [ {
display : true,
scaleLabel : {
display : true,
labelString : 'Value'
}
} ]
}
}
};
/* Document Ready Event */
$(document).ready(function() {
var ctx = document.getElementById('lineChart').getContext('2d');
window.myLine = new Chart(ctx, config);
/* Configuring WebSocket on Client Side */
var socket = new SockJS('/live-temperature');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
stompClient.subscribe('/topic/temperature', function(temperature) {
$('#temperature').text(temperature.body);
/* Push new data On X-Axis of Chart */
config.data.labels.push(new Date());
/* Push new data on Y-Axis of chart */
config.data.datasets.forEach(function(dataset) {
dataset.data.push(temperature.body);
});
window.myLine.update();
});
});
});
</script>
</head>
<body>
<div class="alert alert-danger" role="alert" style="width:300px;margin-left:40%;margin-top:10px;">
<p class="text-center">Current Temperature : <b id="temperature">0</b></p>
</div>
<div class="model">
<div class="modal-dialog" style="width:80%;height:auto">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">Temperature</h5>
</div>
<div class="model-body">
<div class="container" style="width:80%">
<canvas id="lineChart"></canvas>
</div>
</div>
</div>
</div>
</div>
</body>
</html>
On the client side, we are using SockJS to listen to the messages, which are sent from the server side WebSocket.
Properties File
server.port=5656
#Kafka Topic and Server Port
kafka.topic=livetemperature
kafka.bootstrapserver=localhost:9092
You can find the complete source code in GitHub.
Opinions expressed by DZone contributors are their own.
Comments