Skip to content

Connect FlowMQ with the C++ SDK

The C++ binding of FlowSD SDK is the reference implementation, written in the Flow programming language, and offers the highest performance and most direct access to FlowMQ's features. It is built on an actor-based concurrency model and uses Future<T> for all asynchronous operations.

Prerequisites

  • C++11 or newer: A compatible C++ compiler (e.g., GCC, Clang, MSVC).
  • CMake 3.15+: For building the SDK and your application.
  • Flow Library: Your project must be ableto link against the flow library, which provides the core actor framework.
  • Running FlowMQ Instance: Access to a FlowMQ broker, assumed to be at localhost:7070.

Installation & Project Setup

Integrating the Flow SDK typically involves including it as a subdirectory in your CMake project or using a package manager like Conan if available. For this guide, we'll assume you are using CMake and FetchContent.

CMake FetchContent Example

Add the following to your CMakeLists.txt:

cmake
cmake_minimum_required(VERSION 3.15)
project(MyFlowMQApp CXX)

include(FetchContent)

# Add dependencies for Flow and the SDK
FetchContent_Declare(
  flow
  GIT_REPOSITORY https://github.com/apple/foundationdb.git # Or the specific Flow repo
  GIT_TAG <FLOW_VERSION>
)
FetchContent_Declare(
  flowmq_sdk
  GIT_REPOSITORY https://github.com/flowmq/flowmq-sdk-cpp.git
  GIT_TAG <SDK_VERSION>
)

FetchContent_MakeAvailable(flow flowmq_sdk)

add_executable(my_app main.cpp)

# Link against the SDK
target_link_libraries(my_app PRIVATE flowmq::sdk)

Core Concepts

  • INetwork and the flow event loop: The foundation of any Flow application. You must run the g_network event loop.
  • FlowClient: The main client object, created via a factory function.
  • ClientConfig: A struct for configuring the FlowClient.
  • Producer: An object created from the FlowClient for publishing messages.
  • Consumer: An object responsible for receiving messages.
  • Message: A struct representing a message.
  • Future<T>: The core primitive for all asynchronous operations. Every network call returns a Future.
  • ACTOR: The macro used to define asynchronous functions (actors) that can wait() on Futures without blocking the thread.

Establishing a Connection

A Flow application is structured around actors. Your main logic will be inside an actor that is started by the Flow runtime.

Configuration and Creation

cpp
#define FDB_API_VERSION 710 // Use the appropriate API version
#include "flow/flow.h"
#include "flow/Net2.h"
#include "flowmq/sdk/FlowClient.h"
#include "flowmq/sdk/Authentication.h"

// An actor is an async function
ACTOR Future<Void> connectionActor() {
    state ClientConfig config;
    config.brokerAddress = "localhost:7070";
    config.auth = Authentication::token("YOUR_API_TOKEN");

    // Creating a client is an async operation
    state Reference<FlowClient> client = wait(FlowClient::create(config));

    printf("Successfully connected to FlowMQ!\n");
    
    // The client will be automatically managed by its Reference<T>
    return Void();
}

int main(int argc, char** argv) {
    g_network = newNet2(Net2::Options());
    
    // Run the main actor
    g_network->run(); 
    
    return 0;
}

Publishing Messages

Publishing is an asynchronous operation that returns a Future<PublishReceipt>. You wait for this Future to complete within an actor.

Example: Publishing a Message to a Topic

cpp
#include "flowmq/sdk/Producer.h"
#include "flowmq/sdk/Message.h"
#include "flowmq/sdk/PublishReceipt.h"

ACTOR Future<Void> publishActor(Reference<FlowClient> client) {
    // 1. Create a producer
    state Reference<Producer> producer = client->newProducer();

    // 2. Construct a message
    state Message msg;
    msg.payload = "Hello from C++!";
    msg.headers["content-type"] = "text/plain";

    // 3. Publish and wait for the result
    try {
        PublishReceipt receipt = wait(producer->publish("sensor.v1.data", msg));
        printf("Message published successfully! ID: %s\n", receipt.messageId.c_str());
    } catch (Error& e) {
        printf("Failed to publish message: %s\n", e.what());
    }
    
    return Void();
}

Consuming Messages

You consume messages by providing a handler, which is an std::function. The handler receives a Message and is expected to return a Future<Void>.

Consuming from a Queue

When consuming from a queue, the handler must return the Future from message.ack().

cpp
#include "flowmq/sdk/Queue.h"

ACTOR Future<Void> queueConsumerActor(Reference<FlowClient> client) {
    state Queue taskQueue("image.processing.tasks");

    printf("Starting consumer on queue: %s...\n", taskQueue.name.c_str());

    // Define the handler as a lambda
    auto handler = [](Message message) -> Future<Void> {
        printf("Received from queue: %s\n", message.payload.toString().c_str());
        // Process message...
        printf("Processing complete. Acknowledging message.\n");
        return message.ack();
    };

    // The consume actor runs until its returned Future is cancelled.
    Future<Void> consumer = client->consume(taskQueue, handler);
    
    // For this example, wait on a delay to allow messages to arrive
    wait(delay(30.0)); 
    consumer.cancel(); // Stop the consumer

    return Void();
}

Consuming from a Stream

When consuming from a stream, the handler must return the Future from message.commitOffset().

cpp
#include "flowmq/sdk/Stream.h"

ACTOR Future<Void> streamConsumerActor(Reference<FlowClient> client) {
    state Stream eventStream("user.activity.events", StartPosition::Earliest);

    printf("Starting consumer on stream: %s...\n", eventStream.name.c_str());
    
    auto handler = [](Message message) -> Future<Void> {
        printf("Received from stream (offset %lld): %s\n", message.offset, message.payload.toString().c_str());
        // Process message...
        printf("Processing complete. Committing offset.\n");
        return message.commitOffset();
    };

    Future<Void> consumer = client->consume(eventStream, handler);
    wait(delay(30.0));
    consumer.cancel();

    return Void();
}

Full End-to-End Example

cpp
#define FDB_API_VERSION 710
#include "flow/flow.h"
#include "flow/Net2.h"
#include "flowmq/sdk/FlowClient.h"
#include "flowmq/sdk/Authentication.h"
#include "flowmq/sdk/Producer.h"
#include "flowmq/sdk/Message.h"
#include "flowmq/sdk/Queue.h"

ACTOR Future<Void> mainActor() {
    state ClientConfig config;
    config.brokerAddress = "localhost:7070";
    config.auth = Authentication::token("YOUR_API_TOKEN");
    
    state Reference<FlowClient> client = wait(FlowClient::create(config));

    // 1. Consumer setup
    state Queue e2eQueue("e2e.queue");
    state Promise<Void> messageReceivedPromise;

    auto handler = [=](Message message) mutable -> Future<Void> {
        printf("Received message: %s\n", message.payload.toString().c_str());
        messageReceivedPromise.send(Void());
        return message.ack();
    };

    state Future<Void> consumer = client->consume(e2eQueue, handler);
    printf("Consumer started and waiting for messages...\n");

    // 2. Producer setup
    state Reference<Producer> producer = client->newProducer();
    state Message outMessage;
    outMessage.payload = "End-to-End Test Message!";

    // 3. Publish and wait for receipt
    wait(producer->publish("e2e.topic", outMessage));
    printf("Message published successfully.\n");
    
    // 4. Wait for the consumer to receive the message
    wait(messageReceivedPromise.getFuture());
    printf("Successfully received and acknowledged message.\n");

    consumer.cancel();
    return Void();
}

int main(int argc, char** argv) {
    g_network = newNet2(Net2::Options());
    
    try {
        g_network->run();
    } catch (Error& e) {
        fprintf(stderr, "Error: %s\n", e.what());
        return 1;
    }
    return 0;
}

Additional Resources

  • Error Handling: Use try...catch blocks within actors to handle exceptions from wait() statements.
  • Actor Programming: Master the Flow actor programming model for building complex, concurrent applications.
  • Flow SDK Overview: For a broader understanding of concepts, refer to the Flow SDK Developer Guide.