Skip to content

Connect FlowMQ with AMQP Rust Client

FlowMQ's native support for AMQP 0.9.1 allows Rust developers to build high-performance, safe, and concurrent messaging applications. This guide will walk you through using lapin, a popular and robust AMQP client for Rust, to connect to FlowMQ, declare queues, publish, and consume messages.

Prerequisites

Before you start, ensure you have the following:

  • A running instance of FlowMQ.
  • The Rust toolchain (including rustc and cargo) installed.

Installation

Add lapin and an async runtime like tokio to your project's Cargo.toml file.

toml
[dependencies]
lapin = "2.1.1"
tokio = { version = "1", features = ["full"] }

Then, run cargo build to fetch and compile the dependencies.

Connecting to FlowMQ

First, establish a connection to your FlowMQ broker and create a channel. All AMQP operations are performed on a channel.

rust
use tokio::runtime::Runtime;
use lapin::{
    options::*,
    types::FieldTable,
    Connection,
    ConnectionProperties,
    Result,
};

#[tokio::main]
async fn main() -> Result<()> {
    let addr = "amqp://guest:guest@localhost:5672/%2f";
    let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;

    println!("Connected to FlowMQ!");

    let channel = conn.create_channel().await?;

    // ... declare, publish, and consume logic here ...

    Ok(())
}

Declaring a Queue

For simplicity, we'll declare a queue and use the default exchange. Messages sent to the default exchange with the queue's name as the routing key will be delivered to that queue.

rust
// Inside the main async function
let queue_name = "hello_rust";

let _queue = channel
    .queue_declare(
        queue_name,
        QueueDeclareOptions::default(),
        FieldTable::default(),
    )
    .await?;

println!("Queue '{}' declared.", queue_name);

Publishing a Message

Now, you can publish a message to the default exchange with the queue name as the routing key.

rust
let payload = b"Hello FlowMQ from Rust!";
let confirm = channel
    .basic_publish(
        "", // Default exchange
        queue_name,
        BasicPublishOptions::default(),
        payload,
        BasicProperties::default(),
    )
    .await?
    .await?; // Wait for the publisher confirm

assert!(confirm.is_ack());
println!("Message published successfully.");

Consuming Messages

To consume messages, create a consumer and iterate over the delivered messages.

rust
let mut consumer = channel
    .basic_consume(
        queue_name,
        "my_consumer",
        BasicConsumeOptions::default(),
        FieldTable::default(),
    )
    .await?;

println!("Consumer created, waiting for messages...");

if let Some(delivery) = consumer.next().await {
    let delivery = delivery?;
    println!("Received message: {:?}", std::str::from_utf8(&delivery.data)?);
    delivery.ack(BasicAckOptions::default()).await?;
}

Full Example

Here is a complete Rust program that connects, declares a queue, publishes, and then consumes a message.

rust
use tokio_stream::StreamExt;
use lapin::{
    options::*,
    types::FieldTable,
    Connection,
    ConnectionProperties,
    Result, BasicProperties,
};

#[tokio::main]
async fn main() -> Result<()> {
    let addr = "amqp://guest:guest@localhost:5672/%2f";
    let queue_name = "hello_lapin";

    // Connect and create channel
    let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;
    let channel = conn.create_channel().await?;
    println!("CONNECTED");

    // Declare queue
    let _queue = channel
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await?;

    // Create consumer
    let mut consumer = channel
        .basic_consume(
            queue_name,
            "my_consumer",
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await?;
    
    // Publish message
    let payload = b"Hello from lapin!";
    channel
        .basic_publish(
            "",
            queue_name,
            BasicPublishOptions::default(),
            payload,
            BasicProperties::default(),
        )
        .await?
        .await?;
    println!("Message published!");

    // Consume message
    if let Some(delivery) = consumer.next().await {
        let delivery = delivery?;
        delivery.ack(BasicAckOptions::default()).await?;
        println!("Received: {}", std::str::from_utf8(&delivery.data)?);
    }
    
    Ok(())
}

Additional Resources

  • For more advanced scenarios and features, check out the official lapin repository.
  • Explore FlowMQ's advanced features for AMQP integration.