Connect FlowMQ with MQTT Rust Client
FlowMQ's native support for the MQTT protocol makes it a perfect match for high-performance, memory-safe applications built with Rust. This guide will walk you through using rumqttc, a popular and robust pure Rust MQTT client, to connect to FlowMQ, publish, and subscribe to messages.
Prerequisites
Before you begin, ensure you have the following:
- A running instance of FlowMQ.
- The Rust toolchain (including
rustcandcargo) installed.
Installation
Add rumqttc and tokio (for the async runtime) as dependencies in your project's Cargo.toml file.
[dependencies]
rumqttc = "0.21.0"
tokio = { version = "1", features = ["full"] }Then, run cargo build to fetch and compile the dependencies.
Connecting and Subscribing
With rumqttc, the client is created with configuration options and handles connection automatically. You can subscribe to topics immediately after creating the client.
use rumqttc::{MqttOptions, AsyncClient, QoS};
use std::time::Duration;
use tokio;
#[tokio::main]
async fn main() {
let mut mqttoptions = MqttOptions::new("my-rust-client", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
// Subscribe to a topic
client.subscribe("flowmq/test/rust", QoS::AtLeastOnce).await.unwrap();
println!("Subscribed to 'flowmq/test/rust'");
// ... publishing and message handling logic here ...
}Publishing Messages
You can use the publish method on the client to send messages.
// Publish a message
client.publish("flowmq/test/rust", QoS::AtLeastOnce, false, "Hello from Rust!").await.unwrap();
println!("Message published.");Receiving Messages
The eventloop continuously polls the network for incoming messages and other events. You can iterate over it to process them.
// Iterate to poll the eventloop for connection events and messages
loop {
let event = eventloop.poll().await;
match &event {
Ok(rumqttc::Event::Incoming(rumqttc::Packet::Publish(p))) => {
println!("Received message: {:?}", p);
// In this simple example, we break after receiving one message.
break;
}
Ok(rumqttc::Event::Incoming(rumqttc::Packet::ConnAck(_))) => {
println!("Connected!");
}
Err(e) => {
println!("Error: {:?}", e);
break;
}
_ => {} // Ignore other events
}
}Full Example
Here is a complete Rust program that connects, subscribes, publishes a message, and waits to receive it.
use rumqttc::{MqttOptions, AsyncClient, QoS};
use std::time::Duration;
use tokio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Configure and create the client
let mut mqttoptions = MqttOptions::new("rust-client-example", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
println!("Connecting...");
// 2. Subscribe to the topic
client.subscribe("flowmq/rust/test", QoS::AtLeastOnce).await?;
println!("Subscribed!");
// 3. Publish a message
client.publish("flowmq/rust/test", QoS::AtLeastOnce, false, "Hello FlowMQ from Rust").await?;
println!("Message published.");
// 4. Poll the event loop to process events
loop {
let event = eventloop.poll().await?;
if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) = event {
println!("\nReceived message:");
println!(" Topic: {}", publish.topic);
println!(" Payload: {:?}", String::from_utf8_lossy(&publish.payload));
break; // Exit after receiving the message
}
}
Ok(())
}Additional Resources
- For more advanced usage, including different QoS levels, clean sessions, and error handling, refer to the official rumqttc repository.
- Explore FlowMQ's advanced features for MQTT.