Connect FlowMQ with MQTT Go Client
FlowMQ's native support for the MQTT protocol makes it an excellent backend for Go applications that require real-time communication. This guide provides a step-by-step walkthrough of using the Eclipse Paho Go client to connect to FlowMQ, publish messages, and subscribe to topics.
Prerequisites
Before you start, ensure you have the following:
- A running instance of FlowMQ.
- Go 1.16 or later installed.
Installation
Add the Eclipse Paho Go client to your project using go get
.
go get github.com/eclipse/paho.mqtt.golang
Connecting to FlowMQ
To get started, you need to configure the client options and create a client instance to connect to your FlowMQ broker.
package main
import (
"fmt"
"os"
"time"
"github.com/eclipse/paho.mqtt.golang"
)
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected to FlowMQ!")
}
var connectionLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connection lost: %v\n", err)
}
func main() {
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://localhost:1883")
opts.SetClientID("my-go-client")
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectionLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// ... publish and subscribe logic here ...
}
Publishing Messages
Once connected, you can easily publish messages to any topic using the Publish
method.
func publish(client mqtt.Client, topic string, payload string) {
token := client.Publish(topic, 1, false, payload)
token.Wait()
fmt.Printf("Published message: '%s' to topic '%s'\n", payload, topic)
}
// Call from main after connecting
// publish(client, "flowmq/test/go", "Hello from Paho Go!")
Subscribing to Topics
To receive messages, subscribe to a topic and provide a callback function to handle incoming messages.
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
func subscribe(client mqtt.Client, topic string) {
token := client.Subscribe(topic, 1, messagePubHandler)
token.Wait()
fmt.Println("Subscribed to topic:", topic)
}
// Call from main after connecting
// subscribe(client, "flowmq/test/go")
Full Example
Here is a complete Go program that connects, subscribes, publishes a message, and then waits to receive it before exiting.
package main
import (
"fmt"
"time"
"github.com/eclipse/paho.mqtt.golang"
)
// Define a message handler callback
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: '%s' on topic: '%s'\n", msg.Payload(), msg.Topic())
}
func main() {
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://localhost:1883")
opts.SetClientID("go-mqtt-client-example")
// Set a connection handler
opts.OnConnect = func(client mqtt.Client) {
fmt.Println("Connected!")
// Subscribe upon connection
if token := client.Subscribe("flowmq/go/test", 1, messagePubHandler); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
return
}
fmt.Println("Subscribed to topic: flowmq/go/test")
}
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// Publish a message
publish(client, "flowmq/go/test", "Hello from Paho Go!")
// Wait for a moment to receive the message
time.Sleep(2 * time.Second)
client.Disconnect(250)
fmt.Println("Disconnected.")
}
func publish(client mqtt.Client, topic string, payload string) {
token := client.Publish(topic, 1, false, payload)
token.Wait()
fmt.Printf("Published message: '%s'\n", payload)
}
Additional Resources
- For more advanced features and configuration options, refer to the official Eclipse Paho Go Client repository.
- Explore FlowMQ's advanced features for MQTT.