Connect FlowMQ with MQTT Flow Client
The Flow programming language brings actor-based concurrency to C++11, making it exceptionally well-suited for building high-performance, asynchronous systems. It was developed and is heavily used for the FoundationDB project. This guide demonstrates how to use an MQTT client written in Flow to connect, publish, and subscribe to FlowMQ.
Flow's asynchronous model, with its actor
, state
, and wait
primitives, provides a natural and powerful way to handle the non-blocking, event-driven nature of the MQTT protocol.
Prerequisites
Before you begin, ensure you have the following:
- A running instance of FlowMQ.
- A C++11 compatible compiler (e.g., GCC, Clang).
- The Flow compiler/transpiler and necessary libraries set up in your development environment.
The actor
Model
In Flow, asynchronous operations are defined within an actor
. An actor is a function that can suspend its execution without blocking a thread, typically while waiting for an I/O operation to complete. The wait
keyword is used to suspend the actor until an asynchronous operation (represented by a Future
) is complete.
Connecting to FlowMQ
An actor is the perfect place to manage the lifecycle of an MQTT client connection.
#include "flow/flow.h"
#include "flow/actorcompiler.h"
#include "MqttClient.h" // Assumed Flow MQTT client header
ACTOR Future<Void> connect(MqttClient* client) {
state MqttConnectionOptions options;
options.broker = "tcp://localhost:1883";
options.clientId = "my-flow-client";
printf("Connecting to FlowMQ...\n");
wait(client->connect(options));
printf("Connected!\n");
return Void();
}
Publishing Messages
Publishing is an asynchronous operation. We create an actor that sends a message and waits for the operation to complete.
ACTOR Future<Void> publish(MqttClient* client, std::string topic, std::string payload) {
state MqttMessage msg;
msg.topic = topic;
msg.payload = payload;
msg.qos = 1;
printf("Publishing message...\n");
wait(client->publish(msg));
printf("Message published.\n");
return Void();
}
Subscribing and Receiving Messages
Subscribing to a topic returns a PromiseStream
, which is a queue of Future
s. An actor can loop and wait
on this stream to process messages as they arrive.
ACTOR Future<Void> subscribeAndListen(MqttClient* client, std::string topic) {
state PromiseStream<MqttMessage> messages = wait(client->subscribe(topic));
printf("Subscribed to '%s'. Waiting for messages...\n", topic.c_str());
loop {
MqttMessage msg = wait(messages.pop());
printf("Received message on topic '%s': %s\n", msg.topic.c_str(), msg.payload.c_str());
// For this example, we stop after one message.
break;
}
return Void();
}
Full Example
Here is a complete example showing how to structure a Flow application to connect, subscribe, publish, and receive a message.
#include "flow/flow.h"
#include "flow/actorcompiler.h"
#include "MqttClient.h"
// (Actor definitions for connect, publish, and subscribeAndListen would be here)
ACTOR Future<Void> mqtt_example() {
state MqttClient client;
state std::string topic = "flowmq/test/flow";
// Connect to the broker
wait(connect(&client));
// Start listening in the background
state Future<Void> listener = subscribeAndListen(&client, topic);
// Give the subscription a moment to be established
wait(delay(0.1));
// Publish a message
wait(publish(&client, topic, "Hello from a Flow-based client!"));
// Wait for the listener to finish (it will after one message)
wait(listener);
// Disconnect
wait(client.disconnect());
printf("Disconnected.\n");
return Void();
}
// Main entry point for a Flow application
int main(int argc, char** argv) {
INetwork* network = newNet2(NetworkAddress(), false);
network->run();
// Run the main actor
auto f = mqtt_example();
// Wait for the actor to complete
while (!f.isReady()) {
wait(delay(0.1));
}
network->stop();
return 0;
}
Additional Resources
- For more details on the Flow programming language, refer to the official FoundationDB Flow Documentation.