Skip to content

Connect FlowMQ with librdkafka C/C++ SDK

FlowMQ supports the Apache Kafka protocol, enabling you to use your favorite Kafka client libraries to interact with it. This guide demonstrates how to use librdkafka, a C/C++ client for Apache Kafka, to produce and consume messages from FlowMQ.

Prerequisites

Before you start, ensure you have the following:

  • A running FlowMQ instance.
  • A C/C++ development environment (e.g., GCC or Clang).
  • The librdkafka library installed on your system.

Installation

You can build and install librdkafka from the source to get the latest features.

bash
# Clone the repository
git clone https://github.com/confluentinc/librdkafka.git
cd librdkafka

# Configure and build
./configure
make

# Install the library
sudo make install

Producer: Connecting and Sending Messages

A producer is responsible for sending records to topics in FlowMQ.

Configuration and Connection

First, you need to configure the producer, specifying the address of your FlowMQ broker.

c
#include <librdkafka/rdkafka.h>

// ...

rd_kafka_t *rk; // Producer instance handle
rd_kafka_conf_t *conf; // Configuration object
char errstr[512];

// Create configuration object
conf = rd_kafka_conf_new();

// Set the bootstrap servers
if (rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    fprintf(stderr, "%s\n", errstr);
    // handle error
}

// Create producer instance
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
    fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
    // handle error
}

Producing a Message

To send a message, you need to specify the topic, partition, message payload, and key.

c
const char *topic = "my_topic";
const char *key = "message-key";
const char *payload = "Hello FlowMQ with Kafka!";

// Produce a message
rd_kafka_resp_err_t err = rd_kafka_producev(
    rk,
    RD_KAFKA_V_TOPIC(topic),
    RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // Make a copy of the payload
    RD_KAFKA_V_KEY((void*)key, strlen(key)),
    RD_KAFKA_V_VALUE((void*)payload, strlen(payload)),
    RD_KAFKA_V_OPAQUE(NULL), // Optional per-message opaque pointer
    RD_KAFKA_V_END
);

if (err) {
    fprintf(stderr, "%% Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err));
}

// Wait for messages to be delivered
rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);

Consumer: Connecting and Receiving Messages

A consumer subscribes to topics and processes the records sent to them.

Configuration and Connection

Similar to the producer, you configure the consumer by setting the bootstrap servers and a group.id.

c
#include <librdkafka/rdkafka.h>

// ...

rd_kafka_t *rk; // Consumer instance handle
rd_kafka_conf_t *conf; // Configuration object
char errstr[512];

// Create configuration object
conf = rd_kafka_conf_new();

// Set bootstrap servers and group id
if (rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
    rd_kafka_conf_set(conf, "group.id", "my_consumer_group", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    fprintf(stderr, "%s\n", errstr);
    // handle error
}

// Create consumer instance
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
    fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
    // handle error
}

Subscribing and Consuming

After creating a consumer, subscribe it to a list of topics and then enter a loop to poll for messages.

c
const char *topic = "my_topic";
rd_kafka_topic_partition_list_t *topics;

// Create topic partition list and add topic
topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);

// Subscribe to topics
rd_kafka_subscribe(rk, topics);
rd_kafka_topic_partition_list_destroy(topics);

// Consumption loop
while (1) {
    rd_kafka_message_t *msg;
    msg = rd_kafka_consumer_poll(rk, 1000); // Poll for 1 second

    if (msg) {
        if (msg->err) {
            fprintf(stderr, "%% Consume error: %s\n", rd_kafka_message_errstr(msg));
        } else {
            printf("%% Message received from topic %s, partition %d at offset %ld:\n",
                   rd_kafka_topic_name(msg->rkt), msg->partition, msg->offset);
            printf("Key: %.*s\n", (int)msg->key_len, (char*)msg->key);
            printf("Payload: %.*s\n", (int)msg->len, (char*)msg->payload);
        }
        rd_kafka_message_destroy(msg);
    }
}

// Close the consumer and destroy the instance
rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);

Additional Resources

  • For more in-depth information, refer to the official librdkafka repository and its examples.
  • Explore FlowMQ's advanced features for Kafka integration.