How to Stream Sensor Data to Apache Pinot for Real Time Analysis
Learn how to use Arduino, MQTT, Kafka, and Apache Pinot to stream environmental data for real-time access to sensor data.
Join the DZone community and get the full member experience.
Join For FreeThis is part 2 of a multi-part sample application that I've been building using Kafka and Apache Pinot. In the first part, we built an NFC Badge Reader to record when badges are swiped. In this part, we will begin ingesting some environmental data from a sensor into the mix. I'm not going to spoil the fun by telling you where this is ultimately headed, but feel free to reach out to me on Twitter and see if you can guess!
As always, all of the code is available on GitHub.
Hardware
Hardware is part of this demo because I like building hardware, and it’s always more fun to have something that you can actually interact with. The hardware we’ll be building is a CO2 sensor that reads the CO2 concentration in the air, the temperature, and the relative humidity, and sends it to a Kafka topic. The sensor is based on the SCD-30 sensor, which is a non-dispersive infrared (NDIR) sensor that measures the CO2 concentration in the air. It’s a bit expensive, but it is also an industrial-quality sensor that can be relied on to give accurate readings.
You will see a lot of projects based on cheaper “eCO2” sensors, but they are not as accurate as the SCD-30. I have done extensive testing of a variety of CO2 sensors and the eCO2 sensors can be off by as much as 200%, which is not something you want to rely on if accuracy matters.
The sensor is connected to a SparkFun ESP32 Thing Plus - Qwiic, which reads the sensor data and sends it to a Kafka topic.
I used these 2 components specifically because they both implement the Qwiic connector, which makes it easy to connect them together. The Qwiic connector is a standard 4-pin connector that uses I2C and is used by a number of different sensors and boards. It eliminates the need for soldering since you can just plug the sensor into the board and start coding.
The ESP-32 Thing Plus is what’s referred to as a "Feather" because of the form factor. Really, you can use any ESP-32 board for this but the Feather is a great board for this project because it has built-in WiFi and Bluetooth, which makes it easy to connect to the Internet. It also has a built-in battery charger, which makes it easy to power the sensor with a battery if you want to place it somewhere out of reach. I programmed it using the Arduino IDE, which is a great tool for prototyping.
#include <ArduinoJson.h> // Serialize and deserialize JSON
#include "EspMQTTClient.h" // handle MQTT
#include <Wire.h> // 2Wire protocol
#include "SparkFun_SCD30_Arduino_Library.h" // The CO2 Sensor
EspMQTTClient client(
"SSID", // Your SSID
"PASSWD", // Your WiFi Password
"mqtt-broker", // MQTT Broker server ip or hostname
"PinotClient", // Client name that uniquely identify your device
8883 // The MQTT port, default to 1883. this line can be omitted
);
SCD30 airSensor; // our sensor
void setup() {
Serial.begin(115200);
// Optional functionalities of EspMQTTClient
client.enableDebuggingMessages(); // Enable debugging messages sent to serial output
Wire.begin();
if (airSensor.begin() == false) {
Serial.println("Air sensor not detected. Please check wiring. Freezing...");
while (1) // infinite loop of nothingness
;
}
}
// This function is called once everything is connected (Wifi and MQTT)
// WARNING : YOU MUST IMPLEMENT IT IF YOU USE EspMQTTClient
void onConnectionEstablished() {
while (true) { // do this forever
while (airSensor.dataAvailable()) {
int co2 = airSensor.getCO2();
float temp = airSensor.getTemperature();
float hum = airSensor.getHumidity();
StaticJsonDocument<96> doc;
doc["sensor"] = "SCD-30";
doc["co2"] = co2;
doc["temp"] = temp;
doc["humid"] = hum;
char buff[128];
serializeJson(doc, buff);
serializeJson(doc, Serial); // this is for debugging
Serial.println();
client.publish("co2", buff); // publish the data to the co2 topic
}
}
}
void loop() {
client.loop();
}
That’s the entire Arduino sketch. It connects to WiFi, connects to an MQTT broker, and then reads the sensor data and publishes it to the co2
topic. You can find the code on GitHub. The SCD-30 sensor can really only provide about one reading/second, so there’s no need to do anything fancy to make it faster.
Getting the Readings Into Kafka
As it turns out there isn’t a good way to get data straight from an Arduino device into Kafka which is why we sent it to the MQTT broker above. Now that it’s in the MQTT broker we have to get it out and feed it into our Kafka topic.
One of the other drawbacks of using an Arduino device is that they are not very good at keeping time. It is possible to use Network Time Protocol (NTP) to keep the time, but it’s not very reliable. To get around these two problems, I wrote a small program in Go that reads the data from the MQTT broker, gives it an accurate timestamp, and then publishes it to a Kafka topic.
To make things easier, we will reuse some code from our Badge Reader project.
package main
import (
"bufio"
"crypto/tls"
"encoding/json"
"fmt"
"os"
"strings"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
/* Struct to hold the data from the MQTT message */
type env struct {
Sensor string `json:"sensor"`
CO2 int32 `json:"co2"`
Temp float32 `json:"temp"`
Humid float32 `json:"humid"`
Time int32 `json:"time"`
}
That struct should look familiar as it’s almost identical to the one we used in the Arduino code. The only difference is the Time
field, which we will use to store the timestamp.
We will also re-use all the code from the Badge Reader project to connect to Kafka and publish the data to the topic, so I won’t reproduce it here.
/* Read MQTT messages from the specified broker and topic and send them to the kafka broker */
func ReadMQTTMessages(brokerUri string, username string, password string, topic string, qos byte) error {
// Create an MQTT client options object
opts := MQTT.NewClientOptions()
// Set the broker URI, username, and password
opts.AddBroker(brokerUri)
opts.SetUsername(username)
opts.SetPassword(password)
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
// Create an MQTT client
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
// Subscribe to the specified topic with the specified QoS level
if token := client.Subscribe(topic, qos, func(client MQTT.Client, message MQTT.Message) {
envData := env{}
json.Unmarshal(message.Payload(), &envData)
envData.Time = int32(time.Now().UnixMilli())
mess, _ := json.Marshal(envData)
fmt.Printf("Received message: %s\n", mess)
sendToKafka("co_2", string(mess))
}); token.Wait() && token.Error() != nil {
return token.Error()
}
// Wait for messages to arrive
for {
select {}
}
}
func main() {
err := ReadMQTTMessages("tcp://broker-address:8883", "", "", "co2", 0)
if err != nil {
fmt.Printf("Error reading MQTT messages: %v\n", err)
}
}
The main()
function calls the ReadMQTTMessages()
function which connects to the MQTT broker and subscribes to the co2
topic. When a message is received it is parsed into the env
struct, the timestamp is added, and then it is published to the Kafka topic co_2
.
Consuming the Data
Now that we have the data in Kafka, we can begin consuming it into StarTree Cloud. Since this project is building on the previous Badge Reader project, I’m going to reuse that StarTree Cloud instance and add a new table to it.
Since I’ve already been sending data to the Kafka topic, once I add the source (with the right credentials) StarTree Cloud should show me some data that it has read from the topic.
After we click through to the table we can see the data that has been read from the Kafka topic.
From there, we can go to the Pinot Query Editor and start writing queries.
Conclusion
In this post, we looked at how to use an Arduino device to read data from a sensor and send it to an MQTT broker. We then used a Go program to read the data from the MQTT broker, add a timestamp, and send it to a Kafka topic. Finally, we used StarTree Cloud to read the data from the Kafka topic and make it available for querying in Apache Pinot.
Opinions expressed by DZone contributors are their own.
Comments