How To Use MQTT in Golang
How to use paho.mqtt.golang client library in the Golang project, and implement the connection, subscription, and messaging between the client and MQTT broker.
Join the DZone community and get the full member experience.
Join For FreeGolang is a statically, strongly typed, compiled, concurrent, and garbage-collecting programming language developed by Google. Go is expressive, clean, and efficient. Its concurrency mechanism makes it easy to write programs that maximize the use of multicore and network machines, and its innovative type system enables flexible and modular program construction. Go compiles quickly to machine code, but with the convenience of garbage collection and the power of runtime reflection. It is a fast, statically typed, compiled language, like a dynamically typed, interpreted language.
MQTT is a kind of lightweight IoT messaging protocol based on the publish/subscribe model, which can provide real-time and reliable messaging service for IoT devices, only using very little code and bandwidth. It is suitable for devices with limited hardware resources and a network environment with limited bandwidth. Therefore, the MQTT protocol is widely used in IoT, mobile internet, IoV, electricity power, and other industries.
This article introduces how to use the paho.mqtt.golang
client library in the Golang project, and implement the connection, subscription, and messaging between the client and the MQTT broker.
Project Initialization
This project is based on go 1.13.12
to develop and test:
go version
go version go1.13.12 darwin/amd64
This project uses paho.mqtt.golang as MQTT client library. Use the following to install:
go get github.com/eclipse/paho.mqtt.golang
Use of Go MQTT
This article will use the free public MQTT broker provided by EMQX. The access information of the server is as follows:
- Broker:
broker.emqx.io
- TCP Port: 1883
- Websocket Port: 8083
Connect to the MQTT Broker
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func main() {
var broker = "broker.emqx.io"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("emqx")
opts.SetPassword("public")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
ClientOptions
: Used to set broker, port, client id, username, password, and other options.messagePubHandler
: Global MQTT pub message processing.connectHandler
: Callback for the connection.connectLostHandler
: Callback for the connection loss.
If you want to use the TLS connection, you can use the following settings:
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
// Import client certificate/key pair
clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem")
if err != nil {
panic(err)
}
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{clientKeyPair},
}
}
If the client certificate is not set, it can be set as follows:
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
return &tls.Config{
RootCAs: certpool,
}
Then set TLS:
var broker = "broker.emqx.io"
var port = 8883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("ssl://%s:%d", broker, port))
tlsConfig := NewTlsConfig()
opts.SetTLSConfig(tlsConfig)
// other options
Subscription
Next, use the code below for subscriptions:
func sub(client mqtt.Client) {
topic := "topic/test"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic %s", topic)
}
Publish Messages
To publish messages, you can use the following code:
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("topic/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
Test
Lastly, we use the following code for testing:
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"log"
"time"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func main() {
var broker = "broker.emqx.io"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("emqx")
opts.SetPassword("public")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client)
publish(client)
client.Disconnect(250)
}
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("topic/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
func sub(client mqtt.Client) {
topic := "topic/test"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
After running the code, we can see that the MQTT connection and subscription are successful, and we can successfully receive the message of subscribing topic:
Conclusion
So far, we have completed our goal by using the paho.mqtt.golang
client to connect to the public MQTT broker and have implemented the connection, message publishing, and subscription between the test client and the MQTT broker.
Published at DZone with permission of Zhiwei Yu. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments