Skip to content

Connect FlowMQ with MQTT Go Client

FlowMQ's native support for the MQTT protocol makes it an excellent backend for Go applications that require real-time communication. This guide provides a step-by-step walkthrough of using the Eclipse Paho Go client to connect to FlowMQ, publish messages, and subscribe to topics.

Prerequisites

Before you start, ensure you have the following:

  • A running instance of FlowMQ.
  • Go 1.16 or later installed.

Installation

Add the Eclipse Paho Go client to your project using go get.

bash
go get github.com/eclipse/paho.mqtt.golang

Connecting to FlowMQ

To get started, you need to configure the client options and create a client instance to connect to your FlowMQ broker.

go
package main

import (
	"fmt"
	"os"
	"time"
	"github.com/eclipse/paho.mqtt.golang"
)

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	fmt.Println("Connected to FlowMQ!")
}

var connectionLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	fmt.Printf("Connection lost: %v\n", err)
}

func main() {
	opts := mqtt.NewClientOptions()
	opts.AddBroker("tcp://localhost:1883")
	opts.SetClientID("my-go-client")
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectionLostHandler

	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	// ... publish and subscribe logic here ...
}

Publishing Messages

Once connected, you can easily publish messages to any topic using the Publish method.

go
func publish(client mqtt.Client, topic string, payload string) {
	token := client.Publish(topic, 1, false, payload)
	token.Wait()
	fmt.Printf("Published message: '%s' to topic '%s'\n", payload, topic)
}

// Call from main after connecting
// publish(client, "flowmq/test/go", "Hello from Paho Go!")

Subscribing to Topics

To receive messages, subscribe to a topic and provide a callback function to handle incoming messages.

go
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

func subscribe(client mqtt.Client, topic string) {
	token := client.Subscribe(topic, 1, messagePubHandler)
	token.Wait()
	fmt.Println("Subscribed to topic:", topic)
}

// Call from main after connecting
// subscribe(client, "flowmq/test/go")

Full Example

Here is a complete Go program that connects, subscribes, publishes a message, and then waits to receive it before exiting.

go
package main

import (
	"fmt"
	"time"
	"github.com/eclipse/paho.mqtt.golang"
)

// Define a message handler callback
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("Received message: '%s' on topic: '%s'\n", msg.Payload(), msg.Topic())
}

func main() {
	opts := mqtt.NewClientOptions()
	opts.AddBroker("tcp://localhost:1883")
	opts.SetClientID("go-mqtt-client-example")

	// Set a connection handler
	opts.OnConnect = func(client mqtt.Client) {
		fmt.Println("Connected!")
		// Subscribe upon connection
		if token := client.Subscribe("flowmq/go/test", 1, messagePubHandler); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		fmt.Println("Subscribed to topic: flowmq/go/test")
	}

	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	// Publish a message
	publish(client, "flowmq/go/test", "Hello from Paho Go!")
	
	// Wait for a moment to receive the message
	time.Sleep(2 * time.Second)

	client.Disconnect(250)
	fmt.Println("Disconnected.")
}

func publish(client mqtt.Client, topic string, payload string) {
	token := client.Publish(topic, 1, false, payload)
	token.Wait()
	fmt.Printf("Published message: '%s'\n", payload)
}

Additional Resources