Skip to content

Connect FlowMQ with Paho MQTT C SDK

FlowMQ natively supports the MQTT 5.0 protocol, allowing a wide range of MQTT clients to connect and interact with it seamlessly. This guide provides a step-by-step walkthrough on how to use the popular Paho MQTT C client library to connect, publish messages, and subscribe to topics on FlowMQ.

Prerequisites

Before you begin, ensure you have the following:

  • A running FlowMQ instance.
  • A C development environment (e.g., GCC or Clang).
  • The Eclipse Paho MQTT C client library installed.

Installation

You can clone and build the Paho MQTT C library from the source.

bash
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
make
sudo make install

Connecting to FlowMQ

To establish a connection, you need to initialize an MQTT client, set the connection options, and connect to the FlowMQ broker.

c
#include "MQTTClient.h"

#define ADDRESS     "tcp://localhost:1883"
#define CLIENTID    "ExampleClient"

int main(int argc, char* argv[]) {
    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    int rc;

    MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL);
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;

    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
        printf("Failed to connect, return code %d\n", rc);
        // Handle error
    }

    // Client is now connected
}

Publishing Messages

Once connected, you can publish messages to any topic.

c
#define TOPIC       "flowmq/test"
#define PAYLOAD     "Hello, FlowMQ!"
#define QOS         1
#define TIMEOUT     10000L

MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;

pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;

MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);

Subscribing to Topics

To receive messages, subscribe to a topic and set up a message-arrived callback.

Message Arrived Callback

This function will be called whenever a message from a subscribed topic is received.

c
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
    printf("Message arrived\n");
    printf("     topic: %s\n", topicName);
    printf("   message: %.*s\n", message->payloadlen, (char*)message->payload);
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}

Subscribing

c
#define SUB_TOPIC   "flowmq/#"
#define SUB_QOS     1

MQTTClient_setCallbacks(client, NULL, NULL, msgarrvd, NULL);

if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, SUB_QOS)) != MQTTCLIENT_SUCCESS) {
    printf("Failed to subscribe, return code %d\n", rc);
    // Handle error
}

// Client is now subscribed and will receive messages via the callback

Full Example

Here is a complete example that connects, subscribes, publishes a message, and then disconnects.

c
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTClient.h"

#define ADDRESS         "tcp://localhost:1883"
#define CLIENTID        "ExampleClientSub"
#define TOPIC           "flowmq/test"
#define PAYLOAD         "Hello World!"
#define QOS             1
#define TIMEOUT         10000L

volatile MQTTClient_deliveryToken deliveredtoken;

void delivered(void *context, MQTTClient_deliveryToken dt) {
    printf("Message with token value %d delivery confirmed\n", dt);
    deliveredtoken = dt;
}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
    int i;
    char* payloadptr;
    printf("Message arrived\n");
    printf("     topic: %s\n", topicName);
    printf("   message: ");
    payloadptr = message->payload;
    for(i=0; i<message->payloadlen; i++) {
        putchar(*payloadptr++);
    }
    putchar('\n');
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}

void connlost(void *context, char *cause) {
    printf("\nConnection lost\n");
    printf("     cause: %s\n", cause);
}

int main(int argc, char* argv[]) {
    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    int rc;
    int ch;

    MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL);
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;

    MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);

    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
        printf("Failed to connect, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }

    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
           "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
    MQTTClient_subscribe(client, TOPIC, QOS);

    do {
        ch = getchar();
    } while(ch!='Q' && ch != 'q');

    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return rc;
}

Additional Resources