Connect FlowMQ with the Rust SDK
The Rust bindings of Flow SDK are built on the tokio runtime and are designed for performance, safety, and ergonomic asynchronous programming with async/await.
Prerequisites
- Rust 1.56+: The SDK uses modern async features.
- Cargo: For dependency management.
- Tokio Runtime: The examples assume you are using
tokioas your async runtime. - Running FlowMQ Instance: Access to a FlowMQ broker, assumed to be at
localhost:7070.
Installation
The Rust Flow SDK is available on crates.io. Add it to your project using cargo:
cargo add flowmq-sdkYou will also need tokio if you don't already have it:
cargo add tokio --features fullCore Concepts
FlowClient: The main client for interacting with FlowMQ. It manages the underlying gRPC connection and is designed to be created once and shared (e.g., viaArc).ClientConfig: A configuration struct, created with a builder pattern, to set up theFlowClient.Authentication: A helper for providing credentials.Producer: An object created from theFlowClientfor publishing messages.Consumer: An object responsible for receiving messages.Message: A struct representing a message, built using a builder pattern.Queue&Stream: Structs defining the source you want to consume from.async/await: Used for all I/O-bound operations.Result<T, E>: Used for all operations that can fail, promoting robust error handling.
Establishing a Connection
You create a FlowClient by building a ClientConfig and passing it to FlowClient::new. All operations are async.
Configuration and Creation
use flowmq_sdk::{FlowClient, ClientConfig, Authentication};
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfig::builder()
.broker_address("localhost:7070")
.authentication(Authentication::token("YOUR_API_TOKEN"))
.build();
let client = FlowClient::new(config).await?;
println!("Successfully connected to FlowMQ!");
// Your application logic here
client.close().await?;
Ok(())
}Replace YOUR_API_TOKEN with your actual credentials.
Publishing Messages
To publish messages, create a Producer from the client. The publish method is an async function that returns a Result.
Example: Publishing a Message to a Topic
use flowmq_sdk::{FlowClient, ClientConfig, Authentication, Message};
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfig::builder()
.broker_address("localhost:7070")
.authentication(Authentication::token("YOUR_API_TOKEN"))
.build();
let client = FlowClient::new(config).await?;
let producer = client.new_producer();
let message = Message::builder()
.payload(b"Hello from Rust!")
.header("content-type", "text/plain")
.build();
match producer.publish("sensor.v1.data", message).await {
Ok(receipt) => {
println!("Message published successfully! ID: {}", receipt.message_id());
}
Err(e) => {
eprintln!("Failed to publish message: {}", e);
}
}
client.close().await?;
Ok(())
}Consuming Messages
You can consume messages from a Queue or a Stream by calling consume and passing an async message handler closure. The consume method will run the handler for each received message. It's often run as a background task.
Consuming from a Queue
When consuming from a queue, you must explicitly ack() each message after successful processing.
use flowmq_sdk::{FlowClient, ClientConfig, Authentication, Queue};
use std::error::Error;
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfig::builder()
.broker_address("localhost:7070")
.authentication(Authentication::token("YOUR_API_TOKEN"))
.build();
let client = FlowClient::new(config).await?;
let task_queue = Queue::new("image.processing.tasks");
println!("Starting consumer on queue: {}...", task_queue.name());
// The consume method takes an async closure as a handler
let consume_future = client.consume(task_queue, |message| async move {
println!("Received from queue: {}", String::from_utf8_lossy(message.payload()));
// Process message...
println!("Processing complete. Acknowledging message.");
if let Err(e) = message.ack().await {
eprintln!("Failed to ack message: {}", e);
}
});
// Run the consumer until Ctrl-C is pressed
tokio::select! {
res = consume_future => {
if let Err(e) = res {
eprintln!("Consumer error: {}", e);
}
},
_ = signal::ctrl_c() => {
println!("Consumer stopped by user.");
}
}
client.close().await?;
Ok(())
}Consuming from a Stream
When consuming from a stream, you commit offsets to track progress.
use flowmq_sdk::{FlowClient, ClientConfig, Authentication, Stream, StartPosition};
use std::error::Error;
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfig::builder()
.broker_address("localhost:7070")
.authentication(Authentication::token("YOUR_API_TOKEN"))
.build();
let client = FlowClient::new(config).await?;
let event_stream = Stream::new("user.activity.events", StartPosition::Earliest);
println!("Starting consumer on stream: {}...", event_stream.name());
let consume_future = client.consume(event_stream, |message| async move {
println!("Received from stream (offset: {}): {}", message.offset(), String::from_utf8_lossy(message.payload()));
// Process message...
println!("Processing complete. Committing offset.");
if let Err(e) = message.commit_offset().await {
eprintln!("Failed to commit offset: {}", e);
}
});
tokio::select! {
res = consume_future => {
if let Err(e) = res {
eprintln!("Consumer error: {}", e);
}
},
_ = signal::ctrl_c() => {
println!("Consumer stopped by user.");
}
}
client.close().await?;
Ok(())
}Full End-to-End Example
This example combines publishing and consuming using tokio::spawn to run the consumer as a background task.
use flowmq_sdk::{
Authentication, ClientConfig, FlowClient, Message, Queue,
};
use std::error::Error;
use std::sync::Arc;
use tokio::sync::Notify;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfig::builder()
.broker_address("localhost:7070")
.authentication(Authentication::token("YOUR_API_TOKEN"))
.build();
// Use an Arc to share the client between tasks
let client = Arc::new(FlowClient::new(config).await?);
// 1. Consumer Setup (run in background)
// (Assumes 'e2e.topic' is routed to 'e2e.queue' in FlowMQ)
let consumer_client = client.clone();
let message_received_notify = Arc::new(Notify::new());
let notify_clone = message_received_notify.clone();
let consumer_handle = tokio::spawn(async move {
let e2e_queue = Queue::new("e2e.queue");
let result = consumer_client.consume(e2e_queue, move |message| {
let notify = notify_clone.clone();
async move {
println!("Received message: {}", String::from_utf8_lossy(message.payload()));
message.ack().await.unwrap();
notify.notify_one(); // Signal that the message was received
}
}).await;
if let Err(e) = result {
eprintln!("Consumer exited with error: {}", e);
}
});
println!("Consumer started and waiting for messages...");
// 2. Producer: Publish a message
let producer = client.new_producer();
let message = Message::builder().payload(b"End-to-End Test Message!").build();
producer.publish("e2e.topic", message).await?;
println!("Message published successfully.");
// 3. Wait for the consumer to receive the message
message_received_notify.notified().await;
println!("Successfully received and acknowledged message.");
// Cleanly shut down
consumer_handle.abort();
client.close().await?;
Ok(())
}Additional Resources
- Error Handling: Embrace Rust's
Resulttype. Use?for concise error propagation andmatchorif letfor granular error handling. - Concurrency: Use
Arcto share clients and other state safely acrosstokiotasks. Use channels orNotifyfor task communication. - Advanced Configuration: Explore the
ClientConfig::builder()for more options like timeouts and TLS. - Flow SDK Overview: For a broader understanding of the SDK's architecture, refer to the Flow SDK Developer Guide.