Skip to content

Connect FlowMQ with MQTT Flow Client

The Flow programming language brings actor-based concurrency to C++11, making it exceptionally well-suited for building high-performance, asynchronous systems. It was developed and is heavily used for the FoundationDB project. This guide demonstrates how to use an MQTT client written in Flow to connect, publish, and subscribe to FlowMQ.

Flow's asynchronous model, with its actor, state, and wait primitives, provides a natural and powerful way to handle the non-blocking, event-driven nature of the MQTT protocol.

Prerequisites

Before you begin, ensure you have the following:

  • A running instance of FlowMQ.
  • A C++11 compatible compiler (e.g., GCC, Clang).
  • The Flow compiler/transpiler and necessary libraries set up in your development environment.

The actor Model

In Flow, asynchronous operations are defined within an actor. An actor is a function that can suspend its execution without blocking a thread, typically while waiting for an I/O operation to complete. The wait keyword is used to suspend the actor until an asynchronous operation (represented by a Future) is complete.

Connecting to FlowMQ

An actor is the perfect place to manage the lifecycle of an MQTT client connection.

cpp
#include "flow/flow.h"
#include "flow/actorcompiler.h"
#include "MqttClient.h" // Assumed Flow MQTT client header

ACTOR Future<Void> connect(MqttClient* client) {
    state MqttConnectionOptions options;
    options.broker = "tcp://localhost:1883";
    options.clientId = "my-flow-client";
    
    printf("Connecting to FlowMQ...\n");
    wait(client->connect(options));
    printf("Connected!\n");
    
    return Void();
}

Publishing Messages

Publishing is an asynchronous operation. We create an actor that sends a message and waits for the operation to complete.

cpp
ACTOR Future<Void> publish(MqttClient* client, std::string topic, std::string payload) {
    state MqttMessage msg;
    msg.topic = topic;
    msg.payload = payload;
    msg.qos = 1;

    printf("Publishing message...\n");
    wait(client->publish(msg));
    printf("Message published.\n");

    return Void();
}

Subscribing and Receiving Messages

Subscribing to a topic returns a PromiseStream, which is a queue of Futures. An actor can loop and wait on this stream to process messages as they arrive.

cpp
ACTOR Future<Void> subscribeAndListen(MqttClient* client, std::string topic) {
    state PromiseStream<MqttMessage> messages = wait(client->subscribe(topic));
    printf("Subscribed to '%s'. Waiting for messages...\n", topic.c_str());

    loop {
        MqttMessage msg = wait(messages.pop());
        printf("Received message on topic '%s': %s\n", msg.topic.c_str(), msg.payload.c_str());
        // For this example, we stop after one message.
        break;
    }

    return Void();
}

Full Example

Here is a complete example showing how to structure a Flow application to connect, subscribe, publish, and receive a message.

cpp
#include "flow/flow.h"
#include "flow/actorcompiler.h"
#include "MqttClient.h"

// (Actor definitions for connect, publish, and subscribeAndListen would be here)

ACTOR Future<Void> mqtt_example() {
    state MqttClient client;
    state std::string topic = "flowmq/test/flow";

    // Connect to the broker
    wait(connect(&client));

    // Start listening in the background
    state Future<Void> listener = subscribeAndListen(&client, topic);

    // Give the subscription a moment to be established
    wait(delay(0.1));

    // Publish a message
    wait(publish(&client, topic, "Hello from a Flow-based client!"));

    // Wait for the listener to finish (it will after one message)
    wait(listener);

    // Disconnect
    wait(client.disconnect());
    printf("Disconnected.\n");

    return Void();
}

// Main entry point for a Flow application
int main(int argc, char** argv) {
    INetwork* network = newNet2(NetworkAddress(), false);
    network->run();
    
    // Run the main actor
    auto f = mqtt_example();
    
    // Wait for the actor to complete
    while (!f.isReady()) {
        wait(delay(0.1));
    }
    
    network->stop();
    return 0;
}

Additional Resources