Skip to content

Connect FlowMQ with Kafka Rust Client

FlowMQ's native support for the Apache Kafka protocol allows Rust developers to build high-performance, reliable applications using clients like rust-rdkafka. This guide will walk you through producing and consuming messages from FlowMQ using this powerful library.

Prerequisites

Before you start, ensure you have the following:

  • A running instance of FlowMQ.
  • The Rust toolchain (including rustc and cargo) installed.
  • librdkafka installed on your system, as rust-rdkafka is a wrapper around it.

Installation

Add rust-rdkafka as a dependency in your project's Cargo.toml file.

toml
[dependencies]
rdkafka = "0.28"
tokio = { version = "1", features = ["full"] }

Then, run cargo build to fetch and compile the dependencies.

Producer: Connecting and Sending Messages

A producer sends messages to a topic in FlowMQ. rust-rdkafka provides both a blocking and a future-based producer. Here, we'll use the FutureProducer.

Configuration and Connection

rust
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;

#[tokio::main]
async fn main() {
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("message.timeout.ms", "5000")
        .create()
        .expect("Producer creation error");
    
    // ...
}

Sending a Message

Create a FutureRecord and use the send method. You can await the result to see if the message was delivered successfully.

rust
    let topic = "my-topic";
    let payload = "Hello FlowMQ from Rust!";
    
    let record = FutureRecord::to(topic)
        .payload(payload)
        .key("my-key");

    match producer.send(record, Duration::from_secs(0)).await {
        Ok(delivery) => println!("Message delivered to {:?}", delivery),
        Err((e, _)) => println!("Error sending message: {}", e),
    }

Consumer: Connecting and Receiving Messages

A consumer subscribes to topics and processes incoming messages. The StreamConsumer is a convenient way to handle messages as a continuous stream.

Configuration and Connection

rust
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};

#[tokio::main]
async fn main() {
    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", "my-rust-group")
        .set("bootstrap.servers", "localhost:9092")
        .set("auto.offset.reset", "earliest")
        .create()
        .expect("Consumer creation failed");

    // ...
}

Subscribing and Consuming

Subscribe the consumer to a topic and then loop over the message stream.

rust
use rdkafka::consumer::Consumer;
use rdkafka::message::Message;

    consumer
        .subscribe(&["my-topic"])
        .expect("Can't subscribe to specified topics");

    println!("Waiting for messages...");

    loop {
        match consumer.recv().await {
            Err(e) => println!("Kafka error: {}", e),
            Ok(m) => {
                let payload = match m.payload_view::<str>() {
                    None => "",
                    Some(Ok(s)) => s,
                    Some(Err(e)) => {
                        println!("Error viewing payload: {:?}", e);
                        ""
                    }
                };
                println!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}",
                         m.key(), payload, m.topic(), m.partition(), m.offset());
            }
        };
    }

Additional Resources

  • For more advanced features and detailed documentation, visit the official rust-rdkafka repository.
  • Explore FlowMQ's advanced features for Kafka integration.