Connect FlowMQ with Kafka Rust Client
FlowMQ's native support for the Apache Kafka protocol allows Rust developers to build high-performance, reliable applications using clients like rust-rdkafka
. This guide will walk you through producing and consuming messages from FlowMQ using this powerful library.
Prerequisites
Before you start, ensure you have the following:
- A running instance of FlowMQ.
- The Rust toolchain (including
rustc
andcargo
) installed. librdkafka
installed on your system, asrust-rdkafka
is a wrapper around it.
Installation
Add rust-rdkafka
as a dependency in your project's Cargo.toml
file.
[dependencies]
rdkafka = "0.28"
tokio = { version = "1", features = ["full"] }
Then, run cargo build
to fetch and compile the dependencies.
Producer: Connecting and Sending Messages
A producer sends messages to a topic in FlowMQ. rust-rdkafka
provides both a blocking and a future-based producer. Here, we'll use the FutureProducer
.
Configuration and Connection
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;
#[tokio::main]
async fn main() {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
// ...
}
Sending a Message
Create a FutureRecord
and use the send
method. You can await
the result to see if the message was delivered successfully.
let topic = "my-topic";
let payload = "Hello FlowMQ from Rust!";
let record = FutureRecord::to(topic)
.payload(payload)
.key("my-key");
match producer.send(record, Duration::from_secs(0)).await {
Ok(delivery) => println!("Message delivered to {:?}", delivery),
Err((e, _)) => println!("Error sending message: {}", e),
}
Consumer: Connecting and Receiving Messages
A consumer subscribes to topics and processes incoming messages. The StreamConsumer
is a convenient way to handle messages as a continuous stream.
Configuration and Connection
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
#[tokio::main]
async fn main() {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", "my-rust-group")
.set("bootstrap.servers", "localhost:9092")
.set("auto.offset.reset", "earliest")
.create()
.expect("Consumer creation failed");
// ...
}
Subscribing and Consuming
Subscribe the consumer to a topic and then loop over the message stream.
use rdkafka::consumer::Consumer;
use rdkafka::message::Message;
consumer
.subscribe(&["my-topic"])
.expect("Can't subscribe to specified topics");
println!("Waiting for messages...");
loop {
match consumer.recv().await {
Err(e) => println!("Kafka error: {}", e),
Ok(m) => {
let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
println!("Error viewing payload: {:?}", e);
""
}
};
println!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}",
m.key(), payload, m.topic(), m.partition(), m.offset());
}
};
}
Additional Resources
- For more advanced features and detailed documentation, visit the official rust-rdkafka repository.
- Explore FlowMQ's advanced features for Kafka integration.