Connect FlowMQ with librdkafka C/C++ SDK
FlowMQ supports the Apache Kafka protocol, enabling you to use your favorite Kafka client libraries to interact with it. This guide demonstrates how to use librdkafka
, a C/C++ client for Apache Kafka, to produce and consume messages from FlowMQ.
Prerequisites
Before you start, ensure you have the following:
- A running FlowMQ instance.
- A C/C++ development environment (e.g., GCC or Clang).
- The
librdkafka
library installed on your system.
Installation
You can build and install librdkafka
from the source to get the latest features.
# Clone the repository
git clone https://github.com/confluentinc/librdkafka.git
cd librdkafka
# Configure and build
./configure
make
# Install the library
sudo make install
Producer: Connecting and Sending Messages
A producer is responsible for sending records to topics in FlowMQ.
Configuration and Connection
First, you need to configure the producer, specifying the address of your FlowMQ broker.
#include <librdkafka/rdkafka.h>
// ...
rd_kafka_t *rk; // Producer instance handle
rd_kafka_conf_t *conf; // Configuration object
char errstr[512];
// Create configuration object
conf = rd_kafka_conf_new();
// Set the bootstrap servers
if (rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
// handle error
}
// Create producer instance
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
// handle error
}
Producing a Message
To send a message, you need to specify the topic, partition, message payload, and key.
const char *topic = "my_topic";
const char *key = "message-key";
const char *payload = "Hello FlowMQ with Kafka!";
// Produce a message
rd_kafka_resp_err_t err = rd_kafka_producev(
rk,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // Make a copy of the payload
RD_KAFKA_V_KEY((void*)key, strlen(key)),
RD_KAFKA_V_VALUE((void*)payload, strlen(payload)),
RD_KAFKA_V_OPAQUE(NULL), // Optional per-message opaque pointer
RD_KAFKA_V_END
);
if (err) {
fprintf(stderr, "%% Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err));
}
// Wait for messages to be delivered
rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);
Consumer: Connecting and Receiving Messages
A consumer subscribes to topics and processes the records sent to them.
Configuration and Connection
Similar to the producer, you configure the consumer by setting the bootstrap servers and a group.id
.
#include <librdkafka/rdkafka.h>
// ...
rd_kafka_t *rk; // Consumer instance handle
rd_kafka_conf_t *conf; // Configuration object
char errstr[512];
// Create configuration object
conf = rd_kafka_conf_new();
// Set bootstrap servers and group id
if (rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
rd_kafka_conf_set(conf, "group.id", "my_consumer_group", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
// handle error
}
// Create consumer instance
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
// handle error
}
Subscribing and Consuming
After creating a consumer, subscribe it to a list of topics and then enter a loop to poll for messages.
const char *topic = "my_topic";
rd_kafka_topic_partition_list_t *topics;
// Create topic partition list and add topic
topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
// Subscribe to topics
rd_kafka_subscribe(rk, topics);
rd_kafka_topic_partition_list_destroy(topics);
// Consumption loop
while (1) {
rd_kafka_message_t *msg;
msg = rd_kafka_consumer_poll(rk, 1000); // Poll for 1 second
if (msg) {
if (msg->err) {
fprintf(stderr, "%% Consume error: %s\n", rd_kafka_message_errstr(msg));
} else {
printf("%% Message received from topic %s, partition %d at offset %ld:\n",
rd_kafka_topic_name(msg->rkt), msg->partition, msg->offset);
printf("Key: %.*s\n", (int)msg->key_len, (char*)msg->key);
printf("Payload: %.*s\n", (int)msg->len, (char*)msg->payload);
}
rd_kafka_message_destroy(msg);
}
}
// Close the consumer and destroy the instance
rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);
Additional Resources
- For more in-depth information, refer to the official librdkafka repository and its examples.
- Explore FlowMQ's advanced features for Kafka integration.