Real-Time Pulsar and Python Apps on a Pi
Build a Python application on a Raspberry Pi that streams sensor data and more from the edge to any and all data stores while processing data in event time.
Join the DZone community and get the full member experience.
Join For FreeToday we will look at the easy way to build Python streaming applications from the edge to the cloud. Let's walk through how to build a Python application on a Raspberry Pi that streams sensor data and more from the edge to any and all data stores while processing data in event time.
My GitHub repository has all of the code, configuration, and scripts needed to build and run this application.
If you wish to use the same data source as I did, you will need to purchase a Raspberry Pi 3 or 4. I know it is hard to obtain a Pi now, but you may already have one. I highly recommend the Pimoroni Breakout Garden Hat, as it allows you to run an interchangeable amount of sensors with no soldering or difficulty. If you are doing Python programming like we are, they provide solid drivers for all the sensors.
Gear/Hardware
- Raspberry Pi 3 Model B Rev 1.2, Bullseye Raspian, armv71
- Pimoroni Breakout Garden Hat
- 1.12" Mono OLED Breakout 128x128 White/Black Screen
- BME680 Air Quality, Temperature, Pressure, Humidity Sensor
- LWM303D 6D0F Motion Sensor (X, Y, Z Axes)
- BH1745 Luminance and Color Sensor
- LTR-559 Light and Proximity Sensor 0.01 lux to 64,000 lux
- VL53L1X Time of Flight (TOF) Sensor
Fully Setup Raspberry Pi 3 with Garden Hat and Sensors
Software/Libraries
- Python 3.9
- Pulsar Python Client 2.10 (avro) pip3 install pulsar-client[avro]
- Python Breakout Garden
- Python PSUTIL https://pypi.org/project/psutil/
- Python LUMA OLED pip3 install --upgrade luma.oled
- Libraries: sudo apt-get install python3 python3-pip python3-pil libjpeg-dev zlib1g-dev libfreetype6-dev liblcms2-dev libopenjp2-7 libtiff5 -y
I recommend you install Python version 3.9 or later on your Pi, which may require you to update to Pi Bullseye OS edition with the latest drivers. This update could take a while, as there are a lot of related libraries, SDKs, and build tools to update. Once you have rebooted and have the latest Python, you will need to install the breakout garden libraries and related drivers. We are also utilizing the Python library PSUtil, which will give us access to Pi hardware metrics such as disk space, CPU, and network information. We like to report this data as part of the data or in metadata. This is helpful when you expand to thousands of devices.
Python Edge Application with Apache Pulsar, Apache NiFi, and Apache Flink
In the above diagram, we can see the full application that we are going to build today from the sensor to the final data stores. In this application, you will build a Pulsar producer that sends data to the pi-sensors topic. This topic will be consumed by a continuous Flink SQL query, Apache NiFi, and Apache Spark SQL query. This will send data to a NoSQL store and to the file storage of your choice.
The first thing to do is to create the topic that will act as your real-time conduit for sensor and device data. This topic will live under a tenant and namespace inside our Pulsar cluster. I recommend you create a tenant and namespace for this application. We are utilizing the default ones in the code below. You will need the Pulsar Admin client installed. You can also create your topic from the StreamNative Cloud Web UI if you are utilizing StreamNative for your Apache Pulsar hosting needs.
StreamOps
bin/pulsar-admin topics create "persistent://public/default/pi-sensors"
Now that you have a topic, you can begin sending data to it.
First, let's build a schema to model our data, as this is a best practice. We can use JSON or AVRO.
import pulsar
import logging
from pulsar.schema import *
from pulsar.schema import AvroSchema
from pulsar.schema import JsonSchema
from pulsar import Client, AuthenticationOauth2
class breakoutsensor(Record):
uuid = String()
ipaddress = String()
cputempf = Integer()
runtime = Integer()
host = String()
hostname = String()
macaddress = String()
endtime = String()
te = String()
cpu = Float()
diskusage = String()
memory = Float()
rowid = String()
systemtime = String()
ts = Integer()
starttime = String()
BH1745_red = Float()
BH1745_green = Float()
BH1745_blue = Float()
BH1745_clear = Float()
VL53L1X_distance_in_mm = Float()
ltr559_lux = Float()
ltr559_prox = Float()
bme680_tempc = Float()
bme680_tempf = Float()
bme680_pressure = Float()
bme680_humidity = Float()
lsm303d_accelerometer = String()
lsm303d_magnetometer = String()
Now that we have defined our schema, let's build the code to grab our sensor readings and send them as structured JSON to our Pulsar topic.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os.path
import re
import sys
import os
from time import sleep
from time import gmtime, strftime
import numpy as np
import datetime
import subprocess
import base64
import uuid
import datetime
import traceback
import math
import random, string
import socket
import base64
import json
import math
import time
import psutil
import socket
# import paho.mqtt.client as paho
from time import gmtime, strftime
from luma.core.interface.serial import i2c
from luma.core.render import canvas
from luma.oled.device import sh1106
#
# Sensors
#
from bh1745 import BH1745
from ltr559 import LTR559
import VL53L1X
import ltr559
import bme680
from lsm303d import LSM303D
import pulsar
import logging
from pulsar.schema import *
from pulsar.schema import AvroSchema
from pulsar.schema import JsonSchema
from pulsar import Client, AuthenticationOauth2
###
#https://pulsar.apache.org/docs/en/client-libraries-python/
# http://pulsar.apache.org/api/python/schema/schema.m.html#pulsar.schema.schema.AvroSchema
class breakoutsensor(Record):
uuid = String()
ipaddress = String()
cputempf = Integer()
runtime = Integer()
host = String()
hostname = String()
macaddress = String()
endtime = String()
te = String()
cpu = Float()
diskusage = String()
memory = Float()
rowid = String()
systemtime = String()
ts = Integer()
starttime = String()
BH1745_red = Float()
BH1745_green = Float()
BH1745_blue = Float()
BH1745_clear = Float()
VL53L1X_distance_in_mm = Float()
ltr559_lux = Float()
ltr559_prox = Float()
bme680_tempc = Float()
bme680_tempf = Float()
bme680_pressure = Float()
bme680_humidity = Float()
lsm303d_accelerometer = String()
lsm303d_magnetometer = String()
# parse arguments
parse = argparse.ArgumentParser(prog='breakoutsensor.py')
parse.add_argument('-su', '--service-url', dest='service_url', type=str, required=True,
help='The pulsar service you want to connect to')
parse.add_argument('-t', '--topic', dest='topic', type=str, required=True,
help='The topic you want to produce to')
parse.add_argument('-n', '--number', dest='number', type=int, default=1,
help='The number of message you want to produce')
parse.add_argument('--auth-params', dest='auth_params', type=str, default="",
help='The auth params which you need to configure the client')
args = parse.parse_args()
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
host = os.uname()[1]
def do_nothing(obj):
pass
def getCPUtemperature():
res = os.popen('vcgencmd measure_temp').readline()
return(res.replace("temp=","").replace("'C\n",""))
def IP_address():
try:
s = socket.socket(socket_family, socket.SOCK_DGRAM)
s.connect(external_IP_and_port)
answer = s.getsockname()
s.close()
return answer[0] if answer else None
except socket.error:
return None
# Get MAC address of a local interfaces
def psutil_iface(iface):
# type: (str) -> Optional[str]
import psutil
nics = psutil.net_if_addrs()
if iface in nics:
nic = nics[iface]
for i in nic:
if i.family == psutil.AF_LINK:
return i.address
# - start timing
starttime = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
start = time.time()
external_IP_and_port = ('198.41.0.4', 53) # a.root-servers.net
socket_family = socket.AF_INET
# Set up OLED
oled = sh1106(i2c(port=1, address=0x3C), rotate=2, height=128, width=128)
oled.cleanup = do_nothing
# Set Constants
MAX_DISTANCE_MM = 800 # Distance at which our bar is full
TRIGGER_DISTANCE_MM = 80
# Ip address
host_name = socket.gethostname()
ipaddress = IP_address()
# bh1745
bh1745 = BH1745()
bh1745.setup()
bh1745.set_leds(1)
r, g, b, c = bh1745.get_rgbc_raw()
bh1745.set_leds(0)
# VL53L1X
tof = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x29)
# ltr559
ltr559 = LTR559()
# lsm303d
lsm = LSM303D(0x1d)
#print(args.service_url)
#print(args.auth_params)
# connect to pulsar
if (len(args.auth_params) == 0 ):
client = pulsar.Client(args.service_url)
else:
client = pulsar.Client(args.service_url, authentication=AuthenticationOauth2(args.auth_params))
#sensorschema = AvroSchema(breakoutsensor)
#print("Schema info is: " + sensorschema.schema_info().schema())
#producer = client.create_producer(topic='persistent://public/default/pi-sensors-avro' ,schema=sensorschema,properties={"producer-name": "sensoravro-py-sensor","producer-id": "sensor-avro-sensor" })
producer = client.create_producer(topic=args.topic ,schema=JsonSchema(breakoutsensor),properties={"producer-name": "sensor-py-sensor","producer-id": "sensor-sensor" })
# mqtt
#client = paho.Client()
# loop forever
try:
while True:
tof.open() # Initialise the i2c bus and configure the sensor
tof.start_ranging(2) # Start ranging, 1 = Short Range, 2 = Medium Range, 3 = Long Range
tof.stop_ranging() # Stop ranging
distance_in_mm = tof.get_distance() # Grab the range in mm
distance_in_mm = min(MAX_DISTANCE_MM, distance_in_mm) # Cap at our MAX_DISTANCE
ltr559.update_sensor()
lux = ltr559.get_lux()
prox = ltr559.get_proximity()
lsm3accl = lsm.accelerometer()
lsm3mag = lsm.magnetometer()
# bme680
try:
sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY)
except IOError:
sensor = bme680.BME680(bme680.I2C_ADDR_SECONDARY)
sensor.set_humidity_oversample(bme680.OS_2X)
sensor.set_pressure_oversample(bme680.OS_4X)
sensor.set_temperature_oversample(bme680.OS_8X)
sensor.set_filter(bme680.FILTER_SIZE_3)
sensor.set_gas_status(bme680.ENABLE_GAS_MEAS)
sensor.set_gas_heater_temperature(320)
sensor.set_gas_heater_duration(150)
sensor.select_gas_heater_profile(0)
bh1745.set_leds(1)
r, g, b, c = bh1745.get_rgbc_raw()
bh1745.set_leds(0)
uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
uniqueid = 'snr_{0}'.format(strftime("%Y%m%d%H%M%S",gmtime()))
cpuTemp= int(float(getCPUtemperature()))
cputempf = int(round(9.0/5.0 * float(cpuTemp) + 32))
usage = psutil.disk_usage("/")
end = time.time()
sensorRec = breakoutsensor()
sensorRec.uuid = uniqueid
sensorRec.ipaddress = ipaddress
sensorRec.cputempf = int(cputempf)
sensorRec.runtime = int(round(end - start))
sensorRec.host = os.uname()[1]
sensorRec.hostname = host_name
sensorRec.macaddress = psutil_iface('wlan0')
sensorRec.endtime = '{0}'.format( str(end ))
sensorRec.te = '{0}'.format(str(end-start))
sensorRec.cpu = float(psutil.cpu_percent(interval=1))
sensorRec.diskusage = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
sensorRec.memory = float(psutil.virtual_memory().percent)
sensorRec.rowid = str(uuid2)
sensorRec.systemtime = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
sensorRec.ts = int( time.time() )
sensorRec.starttime = str(starttime)
sensorRec.BH1745_red = float('{:3.1f}'.format(r))
sensorRec.BH1745_green = float('{:3.1f}'.format(g))
sensorRec.BH1745_blue = float('{:3.1f}'.format(b))
sensorRec.BH1745_clear = float('{:3.1f}'.format(c))
sensorRec.VL53L1X_distance_in_mm = float(distance_in_mm)
sensorRec.ltr559_lux = float('{:06.2f}'.format(lux))
sensorRec.ltr559_prox = float('{:04d}'.format(prox))
sensorRec.bme680_tempc = float('{0:.2f}'.format(sensor.data.temperature))
sensorRec.bme680_tempf = float('{0:.2f}'.format((sensor.data.temperature * 1.8) + 32))
sensorRec.bme680_pressure = float('{0:.2f}'.format(sensor.data.pressure))
sensorRec.bme680_humidity = float('{0:.3f}'.format(sensor.data.humidity))
sensorRec.lsm303d_accelerometer = "{:+06.2f}g : {:+06.2f}g : {:+06.2f}g".format(*lsm3accl)
sensorRec.lsm303d_magnetometer = "{:+06.2f} : {:+06.2f} : {:+06.2f}".format(*lsm3mag)
print(sensorRec)
producer.send(sensorRec,partition_key=uniqueid)
#client.connect("pulsar1", 1883, 60)
#client.publish("aqsensor", payload=str(sensorRec))
with canvas(oled) as draw:
draw.rectangle(oled.bounding_box, outline="white", fill="black")
draw.text((0, 0), "- Apache Pulsar -", fill="white")
draw.text((0, 10), ipaddress, fill="white")
draw.text((0, 20), starttime, fill="white")
draw.text((0, 30), 'Temp: {}'.format( sensor.data.temperature ), fill="white")
draw.text((0, 40), 'Humidity: {}'.format( sensor.data.humidity ), fill="white")
draw.text((0, 50), 'Pressure: {}'.format( sensor.data.pressure ), fill="white")
draw.text((0, 60), 'Distance: {}'.format(str(distance_in_mm)), fill="white")
draw.text((0, 70), 'CPUTemp: {}'.format( cpuTemp ), fill="white")
draw.text((0, 80), 'TempF: {}'.format( sensorRec.bme680_tempf ), fill="white")
draw.text((0, 90), 'A: {}'.format(sensorRec.lsm303d_accelerometer), fill="white")
draw.text((0, 100), 'M: {}'.format(sensorRec.lsm303d_magnetometer), fill="white")
draw.text((0, 110), 'DU: {}'.format(sensorRec.diskusage), fill="white")
#time.sleep(0.5)
except KeyboardInterrupt:
pass
client.close()
In the next step, we will run our device and start producing records as we can see below.
Device Running
VL53L0X_GetDeviceInfo:
Device Name : VL53L1 cut1.1
Device Type : VL53L1
Device ID :
ProductRevisionMajor : 1
ProductRevisionMinor : 15
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'snr_20220323200032', 'ipaddress': '192.168.1.229', 'cputempf': 99, 'runtime': 154, 'host': 'piups', 'hostname': 'piups', 'macaddress': 'b8:27:eb:4a:4b:61', 'endtime': '1648065632.645613', 'te': '154.00473523139954', 'cpu': 0.0, 'diskusage': '3895.3 MB', 'memory': 21.5, 'rowid': '20220323200032_6a66f9ea-1273-4e5d-b150-9300f6272482', 'systemtime': '03/23/2022 16:00:33', 'ts': 1648065633, 'starttime': '03/23/2022 15:57:58', 'BH1745_red': 112.2, 'BH1745_green': 82.0, 'BH1745_blue': 63.0, 'BH1745_clear': 110.0, 'VL53L1X_distance_in_mm': -1185.0, 'ltr559_lux': 6.65, 'ltr559_prox': 0.0, 'bme680_tempc': 23.6, 'bme680_tempf': 74.48, 'bme680_pressure': 1017.48, 'bme680_humidity': 33.931, 'lsm303d_accelerometer': '-00.08g : -01.00g : +00.01g', 'lsm303d_magnetometer': '+00.06 : +00.30 : +00.07'}
VL53L1X Start Ranging Address 0x29
We can easily test that data is entering our topic using the command line consumer tool. No code is required.
Consumer
bin/pulsar-client consume "persistent://public/default/pi-sensors" -s "pisnsrgrdnrdr" -n 0
If we wish to do some reporting, querying, or building SQL dashboards, we can use Pulsar SQL which runs on Presto. We can use any JDBC tool to do this or the build in the Pulsar SQL command line interface shown below.
SQL Consumers
Pulsar SQL/Presto/Trino
desc pulsar."public/default"."pi-sensors";
Column | Type | Extra | Comment
------------------------+-----------+-------+-----------------------------------------------------------------------------
uuid | varchar | | ["null","string"]
ipaddress | varchar | | ["null","string"]
cputempf | integer | | ["null","int"]
runtime | integer | | ["null","int"]
host | varchar | | ["null","string"]
hostname | varchar | | ["null","string"]
macaddress | varchar | | ["null","string"]
endtime | varchar | | ["null","string"]
te | varchar | | ["null","string"]
cpu | real | | ["null","float"]
diskusage | varchar | | ["null","string"]
memory | real | | ["null","float"]
rowid | varchar | | ["null","string"]
systemtime | varchar | | ["null","string"]
ts | integer | | ["null","int"]
starttime | varchar | | ["null","string"]
bh1745_red | real | | ["null","float"]
bh1745_green | real | | ["null","float"]
bh1745_blue | real | | ["null","float"]
bh1745_clear | real | | ["null","float"]
vl53l1x_distance_in_mm | real | | ["null","float"]
ltr559_lux | real | | ["null","float"]
ltr559_prox | real | | ["null","float"]
bme680_tempc | real | | ["null","float"]
bme680_tempf | real | | ["null","float"]
bme680_pressure | real | | ["null","float"]
bme680_humidity | real | | ["null","float"]
lsm303d_accelerometer | varchar | | ["null","string"]
lsm303d_magnetometer | varchar | | ["null","string"]
__partition__ | integer | | The partition number which the message belongs to
__event_time__ | timestamp | | Application defined timestamp in milliseconds of when the event occurred
__publish_time__ | timestamp | | The timestamp in milliseconds of when event as published
__message_id__ | varchar | | The message ID of the message used to generate this row
__sequence_id__ | bigint | | The sequence ID of the message used to generate this row
__producer_name__ | varchar | | The name of the producer that publish the message used to generate this row
__key__ | varchar | | The partition key for the topic
__properties__ | varchar | | User defined properties
(37 rows)
presto> select * from pulsar."public/default"."pi-sensors";
uuid | ipaddress | cputempf | runtime | host | hostname | macaddress | endtime | te | cpu | disk
--------------------+---------------+----------+---------+-------+----------+-------------------+--------------------+--------------------+-----+-----
snr_20220323180318 | 192.168.1.229 | 99 | 4 | piups | piups | b8:27:eb:4a:4b:61 | 1648058598.8543017 | 4.47935152053833 | 0.2 | 3895
snr_20220323180324 | 192.168.1.229 | 99 | 10 | piups | piups | b8:27:eb:4a:4b:61 | 1648058604.4054732 | 10.03052306175232 | 0.0 | 3895
snr_20220323180329 | 192.168.1.229 | 99 | 16 | piups | piups | b8:27:eb:4a:4b:61 | 1648058609.8929565 | 15.518006324768066 | 6.5 | 3895
snr_20220323180335 | 192.168.1.229 | 99 | 21 | piups | piups | b8:27:eb:4a:4b:61 | 1648058615.3783045 | 21.00335431098938 | 0.2 | 3895
snr_20220323180340 | 192.168.1.229 | 99 | 26 | piups | piups | b8:27:eb:4a:4b:61 | 1648058620.8675282 | 26.49257802963257 | 4.6 | 3895
snr_20220323180346 | 192.168.1.229 | 99 | 32 | piups | piups | b8:27:eb:4a:4b:61 | 1648058626.3639522 | 31.989001989364624 | 0.0 | 3895
snr_20220323180351 | 192.168.1.229 | 99 | 38 | piups | piups | b8:27:eb:4a:4b:61 | 1648058631.8793604 | 37.50441026687622 | 0.0 | 3895
snr_20220323180357 | 192.168.1.229 | 100 | 43 | piups | piups | b8:27:eb:4a:4b:61 | 1648058637.38347 | 43.008519887924194 | 0.0 | 3895
snr_20220323180402 | 192.168.1.229 | 99 | 49 | piups | piups | b8:27:eb:4a:4b:61 | 1648058642.8820572 | 48.50710701942444 | 0.0 | 3895
snr_20220323180408 | 192.168.1.229 | 99 | 54 | piups | piups | b8:27:eb:4a:4b:61 | 1648058648.3795574 | 54.00460720062256 | 6.2 | 3895
snr_20220323180413 | 192.168.1.229 | 99 | 59 | piups | piups | b8:27:eb:4a:4b:61 | 1648058653.8280468 | 59.45309662818909 | 0.0 | 3895
snr_20220323180419 | 192.168.1.229 | 99 | 65 | piups | piups | b8:27:eb:4a:4b:61 | 1648058659.3180714 | 64.94312119483948 | 4.9 | 3895
snr_20220323180424 | 192.168.1.229 | 99 | 70 | piups | piups | b8:27:eb:4a:4b:61 | 1648058664.8023574 | 70.42740726470947 | 0.0 | 3895
snr_20220323180430 | 192.168.1.229 | 99 | 76 | piups | piups | b8:27:eb:4a:4b:61 | 1648058670.286937 | 75.91198682785034 | 0.0 | 3895
snr_20220323180435 | 192.168.1.229 | 97 | 81 | piups | piups | b8:27:eb:4a:4b:61 | 1648058675.7804654 | 81.40551519393921 | 0.0 | 3895
snr_20220323180441 | 192.168.1.229 | 99 | 87 | piups | piups | b8:27:eb:4a:4b:61 | 1648058681.2751634 | 86.90021324157715 | 0.0 | 3895
snr_20220323180446 | 192.168.1.229 | 99 | 92 | piups | piups | b8:27:eb:4a:4b:61 | 1648058686.7713509 | 92.39640069007874 | 5.9 | 3895
snr_20220323180452 | 192.168.1.229 | 99 | 98 | piups | piups | b8:27:eb:4a:4b:61 | 1648058692.2672575 | 97.89230728149414 | 0.3 | 3895
snr_20220323180457 | 192.168.1.229 | 99 | 103 | piups | piups | b8:27:eb:4a:4b:61 | 1648058697.7704427 | 103.39549255371094 | 5.4 | 3895
snr_20220323180503 | 192.168.1.229 | 99 | 109 | piups | piups | b8:27:eb:4a:4b:61 | 1648058703.21333 | 108.83837985992432 | 0.3 | 3895
snr_20220323180508 | 192.168.1.229 | 99 | 114 | piups | piups | b8:27:eb:4a:4b:61 | 1648058708.6879904 | 114.31304025650024 | 0.0 | 3895
snr_20220323180514 | 192.168.1.229 | 99 | 120 | piups | piups | b8:27:eb:4a:4b:61 | 1648058714.1396198 | 119.76466965675354 | 0.3 | 3895
snr_20220323180519 | 192.168.1.229 | 99 | 125 | piups | piups | b8:27:eb:4a:4b:61 | 1648058719.6158638 | 125.24091362953186 | 0.0 | 3895
snr_20220323180525 | 192.168.1.229 | 100 | 131 | piups | piups | b8:27:eb:4a:4b:61 | 1648058725.0950723 | 130.72012209892273 | 6.5 | 3895
snr_20220323180530 | 192.168.1.229 | 99 | 136 | piups | piups | b8:27:eb:4a:4b:61 | 1648058730.57256 | 136.19760990142822 | 0.0 | 3895
(25 rows)
Query 20220323_184946_00003_p66fs, FINISHED, 1 node
Table Layout
Example Rows
Detailed Schema for the Topic in Pulsar SQL
Below we are creating a simple Spark streaming application in Scala to consume topic messages in JSON and store them in a file system as CSV as events arrived with Spark Structured streaming.
Spark SQL
val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", "http://pulsar1:8080").option("topic", "persistent://public/default/pi-sensors").load()
scala> dfPulsar.printSchema()
root
|-- uuid: string (nullable = true)
|-- ipaddress: string (nullable = true)
|-- cputempf: integer (nullable = true)
|-- runtime: integer (nullable = true)
|-- host: string (nullable = true)
|-- hostname: string (nullable = true)
|-- macaddress: string (nullable = true)
|-- endtime: string (nullable = true)
|-- te: string (nullable = true)
|-- cpu: float (nullable = true)
|-- diskusage: string (nullable = true)
|-- memory: float (nullable = true)
|-- rowid: string (nullable = true)
|-- systemtime: string (nullable = true)
|-- ts: integer (nullable = true)
|-- starttime: string (nullable = true)
|-- BH1745_red: float (nullable = true)
|-- BH1745_green: float (nullable = true)
|-- BH1745_blue: float (nullable = true)
|-- BH1745_clear: float (nullable = true)
|-- VL53L1X_distance_in_mm: float (nullable = true)
|-- ltr559_lux: float (nullable = true)
|-- ltr559_prox: float (nullable = true)
|-- bme680_tempc: float (nullable = true)
|-- bme680_tempf: float (nullable = true)
|-- bme680_pressure: float (nullable = true)
|-- bme680_humidity: float (nullable = true)
|-- lsm303d_accelerometer: string (nullable = true)
|-- lsm303d_magnetometer: string (nullable = true)
|-- __key: binary (nullable = true)
|-- __topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|-- __publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)
|-- __messageProperties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
## Example Queries
val pQuery = dfPulsar.selectExpr("*").writeStream.format("console").option("truncate", false).start()
val pQuery = dfPulsar.selectExpr("CAST(__key AS STRING)",
"CAST(uuid AS STRING)",
"CAST(ipaddress AS STRING)",
"CAST(cputempf AS STRING)",
"CAST(host AS STRING)",
"CAST(cpu AS STRING)",
"CAST(diskusage AS STRING)",
"CAST(memory AS STRING)",
"CAST(systemtime AS STRING)",
"CAST(BH1745_red AS STRING)",
"CAST(BH1745_green AS STRING)",
"CAST(BH1745_blue AS STRING)",
"CAST(BH1745_clear AS STRING)",
"CAST(VL53L1X_distance_in_mm AS STRING)",
"CAST(ltr559_lux AS STRING)",
"CAST(bme680_tempf AS STRING)",
"CAST(bme680_pressure AS STRING)",
"CAST(bme680_humidity AS STRING)")
.as[(String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String)]
.writeStream.format("csv")
.option("truncate", "false")
.option("header", true)
.option("path", "/opt/demo/pisensordata")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
## You could do csv, parquet, json, orc
pQuery.explain()
pQuery.awaitTermination()
pQuery.stop()
// can be "orc", "json", "csv", etc.
The above simple ETL job will read events as they arrive, convert them from JSON to CSV, and then land them in a file system which could be S3, HDFS, or a vanilla Linux file system.
Spark Runtime Metrics
Spark Explain Plan for our SQL ETL Job
Output from our Spark job
Job Details
Performance Metrics for Spark server
Below we can see the output of one of our loaded records as a CSV file.
Example Spark ETL CSV Output
/opt/demo/pisensordata# cat part-00000-0425bfc8-5d25-4143-818c-bc7af5e1d82c-c000.csv
__key,uuid,ipaddress,cputempf,host,cpu,diskusage,memory,systemtime,BH1745_red,BH1745_green,BH1745_blue,BH1745_clear,VL53L1X_distance_in_mm,ltr559_lux,bme680_tempf,bme680_pressure,bme680_humidity
snr_20220324215723,snr_20220324215723,192.168.1.229,95,piups,0.0,3887.5 MB,20.6,03/24/2022 17:57:24,134.2,99.0,75.6,130.0,15.0,6.09,70.66,1006.11,44.737
We can also run continous Apache Flink SQL applications against our topic data using standard SQL. First, we connect Flink to Pulsar via a catalog pointing to the Administration and Service servers. We can then examine our topics as Flink SQL tables and run a continuous query on them.
Flink SQL
CREATE CATALOG pulsar WITH (
'type' = 'pulsar',
'service-url' = 'pulsar://pulsar1:6650',
'admin-url' = 'http://pulsar1:8080',
'format' = 'json'
);
USE CATALOG pulsar;
SHOW TABLES;
describe `pi-sensors`;
>
+------------------------+--------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------------------------+--------+------+-----+--------+-----------+
| uuid | STRING | true | | | |
| ipaddress | STRING | true | | | |
| cputempf | INT | true | | | |
| runtime | INT | true | | | |
| host | STRING | true | | | |
| hostname | STRING | true | | | |
| macaddress | STRING | true | | | |
| endtime | STRING | true | | | |
| te | STRING | true | | | |
| cpu | FLOAT | true | | | |
| diskusage | STRING | true | | | |
| memory | FLOAT | true | | | |
| rowid | STRING | true | | | |
| systemtime | STRING | true | | | |
| ts | INT | true | | | |
| starttime | STRING | true | | | |
| BH1745_red | FLOAT | true | | | |
| BH1745_green | FLOAT | true | | | |
| BH1745_blue | FLOAT | true | | | |
| BH1745_clear | FLOAT | true | | | |
| VL53L1X_distance_in_mm | FLOAT | true | | | |
| ltr559_lux | FLOAT | true | | | |
| ltr559_prox | FLOAT | true | | | |
| bme680_tempc | FLOAT | true | | | |
| bme680_tempf | FLOAT | true | | | |
| bme680_pressure | FLOAT | true | | | |
| bme680_humidity | FLOAT | true | | | |
| lsm303d_accelerometer | STRING | true | | | |
| lsm303d_magnetometer | STRING | true | | | |
+------------------------+--------+------+-----+--------+-----------+
select max(bme680_pressure) as maxpressure, max(bme680_tempf) as maxtemp, max(ltr559_lux) as maxlux, avg(BH1745_red) as avgred,
max(VL53L1X_distance_in_mm) as maxdistance
from `pi-sensors`
select * from `pi-sensors`;
In the below screen captures, you can see the results of our Flink SQL queries.
Flink SQL Row Summary
Creating Our Flink SQL Catalog to Apache Pulsar
Flink SQL Running
Flink SQL Totals Updating
In the final part of our application, we can write an Apache NiFi flow to consume that same data from the pi-sensors Pulsar topic and write it as rows in MongoDB (or any other NoSQL database).
Apache NiFi: Pulsar Consumer, MongoDB Writer
Apache NiFi Process Group for Our Application
NiFi Data Flow
- Consume Pulsar Records from the pi-sensors topic via Pulsar Connection Pool.
- Using Apache Calcite to Query, Filter, Route, and Transform incoming records.
- Store records as BSON rows in a MongoDB NoSQL collection.
- On failure, route to temporary NiFi queue to process in the future.
Consume Pulsar Records from listed topic
Store Records in PiSensors MongoDB collection via MongoDB Connection Pool
Configure MongoDB Connection Pool
In the code below we will log into our MongoDB server and query our new collection inserted by our Apache NiFi flow.
Data Store: MongoDB
mongo -u debezium -p dbz --authenticationDatabase admin pulsar1:27017/inventory show databases db.createCollection("pisensors") show collections db.pisensors.find().pretty() db.pisensors.find().pretty() { "_id" : ObjectId("623b812e5dae8913d42a93ee"), "uuid" : "snr_20220323194514", "ipaddress" : "192.168.1.229", "cputempf" : 100, "runtime" : 9, "host" : "piups", "hostname" : "piups", "macaddress" : "b8:27:eb:4a:4b:61", "endtime" : "1648064714.7820184", "te" : "9.371636629104614", "cpu" : 6.5, "diskusage" : "3895.4 MB", "memory" : 21.4, "rowid" : "20220323194514_c9ec900f-05c2-49c4-985f-ddd83e8b15c0", "systemtime" : "03/23/2022 15:45:15", "ts" : 1648064715, "starttime" : "03/23/2022 15:45:05", "BH1745_red" : 112.2, "BH1745_green" : 83, "BH1745_blue" : 64.8, "BH1745_clear" : 110, "VL53L1X_distance_in_mm" : 31, "ltr559_lux" : 6.65, "ltr559_prox" : 0, "bme680_tempc" : 23.47, "bme680_tempf" : 74.25, "bme680_pressure" : 1017.71, "bme680_humidity" : 34.432, "lsm303d_accelerometer" : "-00.08g : -01.01g : +00.01g", "lsm303d_magnetometer" : "+00.06 : +00.30 : +00.07"
}
Finally, if it's not watched did it happen? So let's monitor our Pulsar cluster and see what's going on.
Monitor Everything! Let me see what's going on!?!??!
Performance Metrics for Pulsar Brokers
Metrics on Apache Pulsar Topics and Data
Grafana Chart of Pulsar Metrics
Our application is complete, taking sensor data from start to final display in an easy and scalable way.
References
- https://www.academy.streamnative.io/
- https://github.com/tspannhw/PulsarOnRaspberryPi
- https://community.cloudera.com/t5/Community-Articles/IoT-Series-Sensors-Utilizing-Breakout-Garden-Hat-Part-1/ta-p/249262
- https://shop.pimoroni.com/products/breakout-garden-hat?variant=12767628787795
- https://github.com/pimoroni/breakout-garden
- https://github.com/pimoroni/bme680-python
- https://github.com/pimoroni/bh1745-python
- https://github.com/pimoroni/vl53l1x-python
- https://github.com/pimoroni/ltr559-python
- https://github.com/pimoroni/lsm303d-python
- https://github.com/rm-hull/luma.oled
- https://luma-oled.readthedocs.io/en/latest/install.html
- https://github.com/streamnative/examples/blob/master/cloud/python/OAuth2Consumer.py
- https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
Published at DZone with permission of Tim Spann, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments