Connect FlowMQ with Paho MQTT C SDK
FlowMQ natively supports the MQTT 5.0 protocol, allowing a wide range of MQTT clients to connect and interact with it seamlessly. This guide provides a step-by-step walkthrough on how to use the popular Paho MQTT C client library to connect, publish messages, and subscribe to topics on FlowMQ.
Prerequisites
Before you begin, ensure you have the following:
- A running FlowMQ instance.
- A C development environment (e.g., GCC or Clang).
- The Eclipse Paho MQTT C client library installed.
Installation
You can clone and build the Paho MQTT C library from the source.
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
make
sudo make install
Connecting to FlowMQ
To establish a connection, you need to initialize an MQTT client, set the connection options, and connect to the FlowMQ broker.
#include "MQTTClient.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClient"
int main(int argc, char* argv[]) {
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
printf("Failed to connect, return code %d\n", rc);
// Handle error
}
// Client is now connected
}
Publishing Messages
Once connected, you can publish messages to any topic.
#define TOPIC "flowmq/test"
#define PAYLOAD "Hello, FlowMQ!"
#define QOS 1
#define TIMEOUT 10000L
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);
Subscribing to Topics
To receive messages, subscribe to a topic and set up a message-arrived callback.
Message Arrived Callback
This function will be called whenever a message from a subscribed topic is received.
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
Subscribing
#define SUB_TOPIC "flowmq/#"
#define SUB_QOS 1
MQTTClient_setCallbacks(client, NULL, NULL, msgarrvd, NULL);
if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, SUB_QOS)) != MQTTCLIENT_SUCCESS) {
printf("Failed to subscribe, return code %d\n", rc);
// Handle error
}
// Client is now subscribed and will receive messages via the callback
Full Example
Here is a complete example that connects, subscribes, publishes a message, and then disconnects.
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTClient.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "flowmq/test"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTClient_deliveryToken deliveredtoken;
void delivered(void *context, MQTTClient_deliveryToken dt) {
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i<message->payloadlen; i++) {
putchar(*payloadptr++);
}
putchar('\n');
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause) {
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
int main(int argc, char* argv[]) {
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
int ch;
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
MQTTClient_subscribe(client, TOPIC, QOS);
do {
ch = getchar();
} while(ch!='Q' && ch != 'q');
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
Additional Resources
- Explore the official Paho MQTT C documentation for more advanced features.
- Learn more about FlowMQ's advanced MQTT features.