Skip to content

Connect FlowMQ with the Rust SDK

The Rust bindings of Flow SDK are built on the tokio runtime and are designed for performance, safety, and ergonomic asynchronous programming with async/await.

Prerequisites

  • Rust 1.56+: The SDK uses modern async features.
  • Cargo: For dependency management.
  • Tokio Runtime: The examples assume you are using tokio as your async runtime.
  • Running FlowMQ Instance: Access to a FlowMQ broker, assumed to be at localhost:7070.

Installation

The Rust Flow SDK is available on crates.io. Add it to your project using cargo:

shell
cargo add flowmq-sdk

You will also need tokio if you don't already have it:

shell
cargo add tokio --features full

Core Concepts

  • FlowClient: The main client for interacting with FlowMQ. It manages the underlying gRPC connection and is designed to be created once and shared (e.g., via Arc).
  • ClientConfig: A configuration struct, created with a builder pattern, to set up the FlowClient.
  • Authentication: A helper for providing credentials.
  • Producer: An object created from the FlowClient for publishing messages.
  • Consumer: An object responsible for receiving messages.
  • Message: A struct representing a message, built using a builder pattern.
  • Queue & Stream: Structs defining the source you want to consume from.
  • async/await: Used for all I/O-bound operations.
  • Result<T, E>: Used for all operations that can fail, promoting robust error handling.

Establishing a Connection

You create a FlowClient by building a ClientConfig and passing it to FlowClient::new. All operations are async.

Configuration and Creation

rust
use flowmq_sdk::{FlowClient, ClientConfig, Authentication};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let config = ClientConfig::builder()
        .broker_address("localhost:7070")
        .authentication(Authentication::token("YOUR_API_TOKEN"))
        .build();

    let client = FlowClient::new(config).await?;
    
    println!("Successfully connected to FlowMQ!");
    // Your application logic here

    client.close().await?;
    Ok(())
}

Replace YOUR_API_TOKEN with your actual credentials.

Publishing Messages

To publish messages, create a Producer from the client. The publish method is an async function that returns a Result.

Example: Publishing a Message to a Topic

rust
use flowmq_sdk::{FlowClient, ClientConfig, Authentication, Message};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let config = ClientConfig::builder()
        .broker_address("localhost:7070")
        .authentication(Authentication::token("YOUR_API_TOKEN"))
        .build();

    let client = FlowClient::new(config).await?;
    let producer = client.new_producer();

    let message = Message::builder()
        .payload(b"Hello from Rust!")
        .header("content-type", "text/plain")
        .build();

    match producer.publish("sensor.v1.data", message).await {
        Ok(receipt) => {
            println!("Message published successfully! ID: {}", receipt.message_id());
        }
        Err(e) => {
            eprintln!("Failed to publish message: {}", e);
        }
    }

    client.close().await?;
    Ok(())
}

Consuming Messages

You can consume messages from a Queue or a Stream by calling consume and passing an async message handler closure. The consume method will run the handler for each received message. It's often run as a background task.

Consuming from a Queue

When consuming from a queue, you must explicitly ack() each message after successful processing.

rust
use flowmq_sdk::{FlowClient, ClientConfig, Authentication, Queue};
use std::error::Error;
use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let config = ClientConfig::builder()
        .broker_address("localhost:7070")
        .authentication(Authentication::token("YOUR_API_TOKEN"))
        .build();
    let client = FlowClient::new(config).await?;

    let task_queue = Queue::new("image.processing.tasks");

    println!("Starting consumer on queue: {}...", task_queue.name());
    
    // The consume method takes an async closure as a handler
    let consume_future = client.consume(task_queue, |message| async move {
        println!("Received from queue: {}", String::from_utf8_lossy(message.payload()));
        // Process message...
        println!("Processing complete. Acknowledging message.");
        if let Err(e) = message.ack().await {
            eprintln!("Failed to ack message: {}", e);
        }
    });

    // Run the consumer until Ctrl-C is pressed
    tokio::select! {
        res = consume_future => {
            if let Err(e) = res {
                eprintln!("Consumer error: {}", e);
            }
        },
        _ = signal::ctrl_c() => {
            println!("Consumer stopped by user.");
        }
    }
    
    client.close().await?;
    Ok(())
}

Consuming from a Stream

When consuming from a stream, you commit offsets to track progress.

rust
use flowmq_sdk::{FlowClient, ClientConfig, Authentication, Stream, StartPosition};
use std::error::Error;
use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let config = ClientConfig::builder()
        .broker_address("localhost:7070")
        .authentication(Authentication::token("YOUR_API_TOKEN"))
        .build();
    let client = FlowClient::new(config).await?;

    let event_stream = Stream::new("user.activity.events", StartPosition::Earliest);

    println!("Starting consumer on stream: {}...", event_stream.name());
    
    let consume_future = client.consume(event_stream, |message| async move {
        println!("Received from stream (offset: {}): {}", message.offset(), String::from_utf8_lossy(message.payload()));
        // Process message...
        println!("Processing complete. Committing offset.");
        if let Err(e) = message.commit_offset().await {
            eprintln!("Failed to commit offset: {}", e);
        }
    });

    tokio::select! {
        res = consume_future => {
            if let Err(e) = res {
                eprintln!("Consumer error: {}", e);
            }
        },
        _ = signal::ctrl_c() => {
            println!("Consumer stopped by user.");
        }
    }

    client.close().await?;
    Ok(())
}

Full End-to-End Example

This example combines publishing and consuming using tokio::spawn to run the consumer as a background task.

rust
use flowmq_sdk::{
    Authentication, ClientConfig, FlowClient, Message, Queue,
};
use std::error::Error;
use std::sync::Arc;
use tokio::sync::Notify;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let config = ClientConfig::builder()
        .broker_address("localhost:7070")
        .authentication(Authentication::token("YOUR_API_TOKEN"))
        .build();
    
    // Use an Arc to share the client between tasks
    let client = Arc::new(FlowClient::new(config).await?);
    
    // 1. Consumer Setup (run in background)
    // (Assumes 'e2e.topic' is routed to 'e2e.queue' in FlowMQ)
    let consumer_client = client.clone();
    let message_received_notify = Arc::new(Notify::new());
    let notify_clone = message_received_notify.clone();

    let consumer_handle = tokio::spawn(async move {
        let e2e_queue = Queue::new("e2e.queue");
        let result = consumer_client.consume(e2e_queue, move |message| {
            let notify = notify_clone.clone();
            async move {
                println!("Received message: {}", String::from_utf8_lossy(message.payload()));
                message.ack().await.unwrap();
                notify.notify_one(); // Signal that the message was received
            }
        }).await;

        if let Err(e) = result {
            eprintln!("Consumer exited with error: {}", e);
        }
    });

    println!("Consumer started and waiting for messages...");
    
    // 2. Producer: Publish a message
    let producer = client.new_producer();
    let message = Message::builder().payload(b"End-to-End Test Message!").build();
    
    producer.publish("e2e.topic", message).await?;
    println!("Message published successfully.");

    // 3. Wait for the consumer to receive the message
    message_received_notify.notified().await;
    println!("Successfully received and acknowledged message.");
    
    // Cleanly shut down
    consumer_handle.abort();
    client.close().await?;
    Ok(())
}

Additional Resources

  • Error Handling: Embrace Rust's Result type. Use ? for concise error propagation and match or if let for granular error handling.
  • Concurrency: Use Arc to share clients and other state safely across tokio tasks. Use channels or Notify for task communication.
  • Advanced Configuration: Explore the ClientConfig::builder() for more options like timeouts and TLS.
  • Flow SDK Overview: For a broader understanding of the SDK's architecture, refer to the Flow SDK Developer Guide.