Connect FlowMQ with the C++ SDK
The C++ binding of FlowSD SDK is the reference implementation, written in the Flow programming language, and offers the highest performance and most direct access to FlowMQ's features. It is built on an actor-based concurrency model and uses Future<T>
for all asynchronous operations.
Prerequisites
- C++11 or newer: A compatible C++ compiler (e.g., GCC, Clang, MSVC).
- CMake 3.15+: For building the SDK and your application.
- Flow Library: Your project must be ableto link against the
flow
library, which provides the core actor framework. - Running FlowMQ Instance: Access to a FlowMQ broker, assumed to be at
localhost:7070
.
Installation & Project Setup
Integrating the Flow SDK typically involves including it as a subdirectory in your CMake project or using a package manager like Conan if available. For this guide, we'll assume you are using CMake
and FetchContent
.
CMake FetchContent
Example
Add the following to your CMakeLists.txt
:
cmake_minimum_required(VERSION 3.15)
project(MyFlowMQApp CXX)
include(FetchContent)
# Add dependencies for Flow and the SDK
FetchContent_Declare(
flow
GIT_REPOSITORY https://github.com/apple/foundationdb.git # Or the specific Flow repo
GIT_TAG <FLOW_VERSION>
)
FetchContent_Declare(
flowmq_sdk
GIT_REPOSITORY https://github.com/flowmq/flowmq-sdk-cpp.git
GIT_TAG <SDK_VERSION>
)
FetchContent_MakeAvailable(flow flowmq_sdk)
add_executable(my_app main.cpp)
# Link against the SDK
target_link_libraries(my_app PRIVATE flowmq::sdk)
Core Concepts
INetwork
and theflow
event loop: The foundation of any Flow application. You must run theg_network
event loop.FlowClient
: The main client object, created via a factory function.ClientConfig
: A struct for configuring theFlowClient
.Producer
: An object created from theFlowClient
for publishing messages.Consumer
: An object responsible for receiving messages.Message
: A struct representing a message.Future<T>
: The core primitive for all asynchronous operations. Every network call returns aFuture
.ACTOR
: The macro used to define asynchronous functions (actors) that canwait()
onFuture
s without blocking the thread.
Establishing a Connection
A Flow application is structured around actors. Your main logic will be inside an actor that is started by the Flow runtime.
Configuration and Creation
#define FDB_API_VERSION 710 // Use the appropriate API version
#include "flow/flow.h"
#include "flow/Net2.h"
#include "flowmq/sdk/FlowClient.h"
#include "flowmq/sdk/Authentication.h"
// An actor is an async function
ACTOR Future<Void> connectionActor() {
state ClientConfig config;
config.brokerAddress = "localhost:7070";
config.auth = Authentication::token("YOUR_API_TOKEN");
// Creating a client is an async operation
state Reference<FlowClient> client = wait(FlowClient::create(config));
printf("Successfully connected to FlowMQ!\n");
// The client will be automatically managed by its Reference<T>
return Void();
}
int main(int argc, char** argv) {
g_network = newNet2(Net2::Options());
// Run the main actor
g_network->run();
return 0;
}
Publishing Messages
Publishing is an asynchronous operation that returns a Future<PublishReceipt>
. You wait
for this Future
to complete within an actor.
Example: Publishing a Message to a Topic
#include "flowmq/sdk/Producer.h"
#include "flowmq/sdk/Message.h"
#include "flowmq/sdk/PublishReceipt.h"
ACTOR Future<Void> publishActor(Reference<FlowClient> client) {
// 1. Create a producer
state Reference<Producer> producer = client->newProducer();
// 2. Construct a message
state Message msg;
msg.payload = "Hello from C++!";
msg.headers["content-type"] = "text/plain";
// 3. Publish and wait for the result
try {
PublishReceipt receipt = wait(producer->publish("sensor.v1.data", msg));
printf("Message published successfully! ID: %s\n", receipt.messageId.c_str());
} catch (Error& e) {
printf("Failed to publish message: %s\n", e.what());
}
return Void();
}
Consuming Messages
You consume messages by providing a handler, which is an std::function
. The handler receives a Message
and is expected to return a Future<Void>
.
Consuming from a Queue
When consuming from a queue, the handler must return the Future
from message.ack()
.
#include "flowmq/sdk/Queue.h"
ACTOR Future<Void> queueConsumerActor(Reference<FlowClient> client) {
state Queue taskQueue("image.processing.tasks");
printf("Starting consumer on queue: %s...\n", taskQueue.name.c_str());
// Define the handler as a lambda
auto handler = [](Message message) -> Future<Void> {
printf("Received from queue: %s\n", message.payload.toString().c_str());
// Process message...
printf("Processing complete. Acknowledging message.\n");
return message.ack();
};
// The consume actor runs until its returned Future is cancelled.
Future<Void> consumer = client->consume(taskQueue, handler);
// For this example, wait on a delay to allow messages to arrive
wait(delay(30.0));
consumer.cancel(); // Stop the consumer
return Void();
}
Consuming from a Stream
When consuming from a stream, the handler must return the Future
from message.commitOffset()
.
#include "flowmq/sdk/Stream.h"
ACTOR Future<Void> streamConsumerActor(Reference<FlowClient> client) {
state Stream eventStream("user.activity.events", StartPosition::Earliest);
printf("Starting consumer on stream: %s...\n", eventStream.name.c_str());
auto handler = [](Message message) -> Future<Void> {
printf("Received from stream (offset %lld): %s\n", message.offset, message.payload.toString().c_str());
// Process message...
printf("Processing complete. Committing offset.\n");
return message.commitOffset();
};
Future<Void> consumer = client->consume(eventStream, handler);
wait(delay(30.0));
consumer.cancel();
return Void();
}
Full End-to-End Example
#define FDB_API_VERSION 710
#include "flow/flow.h"
#include "flow/Net2.h"
#include "flowmq/sdk/FlowClient.h"
#include "flowmq/sdk/Authentication.h"
#include "flowmq/sdk/Producer.h"
#include "flowmq/sdk/Message.h"
#include "flowmq/sdk/Queue.h"
ACTOR Future<Void> mainActor() {
state ClientConfig config;
config.brokerAddress = "localhost:7070";
config.auth = Authentication::token("YOUR_API_TOKEN");
state Reference<FlowClient> client = wait(FlowClient::create(config));
// 1. Consumer setup
state Queue e2eQueue("e2e.queue");
state Promise<Void> messageReceivedPromise;
auto handler = [=](Message message) mutable -> Future<Void> {
printf("Received message: %s\n", message.payload.toString().c_str());
messageReceivedPromise.send(Void());
return message.ack();
};
state Future<Void> consumer = client->consume(e2eQueue, handler);
printf("Consumer started and waiting for messages...\n");
// 2. Producer setup
state Reference<Producer> producer = client->newProducer();
state Message outMessage;
outMessage.payload = "End-to-End Test Message!";
// 3. Publish and wait for receipt
wait(producer->publish("e2e.topic", outMessage));
printf("Message published successfully.\n");
// 4. Wait for the consumer to receive the message
wait(messageReceivedPromise.getFuture());
printf("Successfully received and acknowledged message.\n");
consumer.cancel();
return Void();
}
int main(int argc, char** argv) {
g_network = newNet2(Net2::Options());
try {
g_network->run();
} catch (Error& e) {
fprintf(stderr, "Error: %s\n", e.what());
return 1;
}
return 0;
}
Additional Resources
- Error Handling: Use
try...catch
blocks within actors to handle exceptions fromwait()
statements. - Actor Programming: Master the Flow actor programming model for building complex, concurrent applications.
- Flow SDK Overview: For a broader understanding of concepts, refer to the Flow SDK Developer Guide.