Skip to content

Connect FlowMQ with MQTT Rust Client

FlowMQ's native support for the MQTT protocol makes it a perfect match for high-performance, memory-safe applications built with Rust. This guide will walk you through using rumqttc, a popular and robust pure Rust MQTT client, to connect to FlowMQ, publish, and subscribe to messages.

Prerequisites

Before you begin, ensure you have the following:

  • A running instance of FlowMQ.
  • The Rust toolchain (including rustc and cargo) installed.

Installation

Add rumqttc and tokio (for the async runtime) as dependencies in your project's Cargo.toml file.

toml
[dependencies]
rumqttc = "0.21.0"
tokio = { version = "1", features = ["full"] }

Then, run cargo build to fetch and compile the dependencies.

Connecting and Subscribing

With rumqttc, the client is created with configuration options and handles connection automatically. You can subscribe to topics immediately after creating the client.

rust
use rumqttc::{MqttOptions, AsyncClient, QoS};
use std::time::Duration;
use tokio;

#[tokio::main]
async fn main() {
    let mut mqttoptions = MqttOptions::new("my-rust-client", "localhost", 1883);
    mqttoptions.set_keep_alive(Duration::from_secs(5));

    let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
    
    // Subscribe to a topic
    client.subscribe("flowmq/test/rust", QoS::AtLeastOnce).await.unwrap();
    println!("Subscribed to 'flowmq/test/rust'");

    // ... publishing and message handling logic here ...
}

Publishing Messages

You can use the publish method on the client to send messages.

rust
    // Publish a message
    client.publish("flowmq/test/rust", QoS::AtLeastOnce, false, "Hello from Rust!").await.unwrap();
    println!("Message published.");

Receiving Messages

The eventloop continuously polls the network for incoming messages and other events. You can iterate over it to process them.

rust
    // Iterate to poll the eventloop for connection events and messages
    loop {
        let event = eventloop.poll().await;
        match &event {
            Ok(rumqttc::Event::Incoming(rumqttc::Packet::Publish(p))) => {
                println!("Received message: {:?}", p);
                // In this simple example, we break after receiving one message.
                break;
            }
            Ok(rumqttc::Event::Incoming(rumqttc::Packet::ConnAck(_))) => {
                println!("Connected!");
            }
            Err(e) => {
                println!("Error: {:?}", e);
                break;
            }
            _ => {} // Ignore other events
        }
    }

Full Example

Here is a complete Rust program that connects, subscribes, publishes a message, and waits to receive it.

rust
use rumqttc::{MqttOptions, AsyncClient, QoS};
use std::time::Duration;
use tokio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Configure and create the client
    let mut mqttoptions = MqttOptions::new("rust-client-example", "localhost", 1883);
    mqttoptions.set_keep_alive(Duration::from_secs(5));

    let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
    println!("Connecting...");

    // 2. Subscribe to the topic
    client.subscribe("flowmq/rust/test", QoS::AtLeastOnce).await?;
    println!("Subscribed!");

    // 3. Publish a message
    client.publish("flowmq/rust/test", QoS::AtLeastOnce, false, "Hello FlowMQ from Rust").await?;
    println!("Message published.");

    // 4. Poll the event loop to process events
    loop {
        let event = eventloop.poll().await?;

        if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) = event {
            println!("\nReceived message:");
            println!("  Topic: {}", publish.topic);
            println!("  Payload: {:?}", String::from_utf8_lossy(&publish.payload));
            break; // Exit after receiving the message
        }
    }

    Ok(())
}

Additional Resources

  • For more advanced usage, including different QoS levels, clean sessions, and error handling, refer to the official rumqttc repository.
  • Explore FlowMQ's advanced features for MQTT.