Connect FlowMQ with MQTT Rust Client
FlowMQ's native support for the MQTT protocol makes it a perfect match for high-performance, memory-safe applications built with Rust. This guide will walk you through using rumqttc
, a popular and robust pure Rust MQTT client, to connect to FlowMQ, publish, and subscribe to messages.
Prerequisites
Before you begin, ensure you have the following:
- A running instance of FlowMQ.
- The Rust toolchain (including
rustc
andcargo
) installed.
Installation
Add rumqttc
and tokio
(for the async runtime) as dependencies in your project's Cargo.toml
file.
[dependencies]
rumqttc = "0.21.0"
tokio = { version = "1", features = ["full"] }
Then, run cargo build
to fetch and compile the dependencies.
Connecting and Subscribing
With rumqttc
, the client is created with configuration options and handles connection automatically. You can subscribe to topics immediately after creating the client.
use rumqttc::{MqttOptions, AsyncClient, QoS};
use std::time::Duration;
use tokio;
#[tokio::main]
async fn main() {
let mut mqttoptions = MqttOptions::new("my-rust-client", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
// Subscribe to a topic
client.subscribe("flowmq/test/rust", QoS::AtLeastOnce).await.unwrap();
println!("Subscribed to 'flowmq/test/rust'");
// ... publishing and message handling logic here ...
}
Publishing Messages
You can use the publish
method on the client to send messages.
// Publish a message
client.publish("flowmq/test/rust", QoS::AtLeastOnce, false, "Hello from Rust!").await.unwrap();
println!("Message published.");
Receiving Messages
The eventloop
continuously polls the network for incoming messages and other events. You can iterate over it to process them.
// Iterate to poll the eventloop for connection events and messages
loop {
let event = eventloop.poll().await;
match &event {
Ok(rumqttc::Event::Incoming(rumqttc::Packet::Publish(p))) => {
println!("Received message: {:?}", p);
// In this simple example, we break after receiving one message.
break;
}
Ok(rumqttc::Event::Incoming(rumqttc::Packet::ConnAck(_))) => {
println!("Connected!");
}
Err(e) => {
println!("Error: {:?}", e);
break;
}
_ => {} // Ignore other events
}
}
Full Example
Here is a complete Rust program that connects, subscribes, publishes a message, and waits to receive it.
use rumqttc::{MqttOptions, AsyncClient, QoS};
use std::time::Duration;
use tokio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Configure and create the client
let mut mqttoptions = MqttOptions::new("rust-client-example", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
println!("Connecting...");
// 2. Subscribe to the topic
client.subscribe("flowmq/rust/test", QoS::AtLeastOnce).await?;
println!("Subscribed!");
// 3. Publish a message
client.publish("flowmq/rust/test", QoS::AtLeastOnce, false, "Hello FlowMQ from Rust").await?;
println!("Message published.");
// 4. Poll the event loop to process events
loop {
let event = eventloop.poll().await?;
if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) = event {
println!("\nReceived message:");
println!(" Topic: {}", publish.topic);
println!(" Payload: {:?}", String::from_utf8_lossy(&publish.payload));
break; // Exit after receiving the message
}
}
Ok(())
}
Additional Resources
- For more advanced usage, including different QoS levels, clean sessions, and error handling, refer to the official rumqttc repository.
- Explore FlowMQ's advanced features for MQTT.