Connect FlowMQ with the Rust SDK
The Rust bindings of Flow SDK are built on the tokio
runtime and are designed for performance, safety, and ergonomic asynchronous programming with async/await
.
Prerequisites
- Rust 1.56+: The SDK uses modern async features.
- Cargo: For dependency management.
- Tokio Runtime: The examples assume you are using
tokio
as your async runtime. - Running FlowMQ Instance: Access to a FlowMQ broker, assumed to be at
localhost:7070
.
Installation
The Rust Flow SDK is available on crates.io. Add it to your project using cargo
:
cargo add flowmq-sdk
You will also need tokio
if you don't already have it:
cargo add tokio --features full
Core Concepts
FlowClient
: The main client for interacting with FlowMQ. It manages the underlying gRPC connection and is designed to be created once and shared (e.g., viaArc
).ClientConfig
: A configuration struct, created with a builder pattern, to set up theFlowClient
.Authentication
: A helper for providing credentials.Producer
: An object created from theFlowClient
for publishing messages.Consumer
: An object responsible for receiving messages.Message
: A struct representing a message, built using a builder pattern.Queue
&Stream
: Structs defining the source you want to consume from.async/await
: Used for all I/O-bound operations.Result<T, E>
: Used for all operations that can fail, promoting robust error handling.
Establishing a Connection
You create a FlowClient
by building a ClientConfig
and passing it to FlowClient::new
. All operations are async
.
Configuration and Creation
use flowmq_sdk::{FlowClient, ClientConfig, Authentication};
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfig::builder()
.broker_address("localhost:7070")
.authentication(Authentication::token("YOUR_API_TOKEN"))
.build();
let client = FlowClient::new(config).await?;
println!("Successfully connected to FlowMQ!");
// Your application logic here
client.close().await?;
Ok(())
}
Replace YOUR_API_TOKEN
with your actual credentials.
Publishing Messages
To publish messages, create a Producer
from the client. The publish
method is an async
function that returns a Result
.
Example: Publishing a Message to a Topic
use flowmq_sdk::{FlowClient, ClientConfig, Authentication, Message};
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfig::builder()
.broker_address("localhost:7070")
.authentication(Authentication::token("YOUR_API_TOKEN"))
.build();
let client = FlowClient::new(config).await?;
let producer = client.new_producer();
let message = Message::builder()
.payload(b"Hello from Rust!")
.header("content-type", "text/plain")
.build();
match producer.publish("sensor.v1.data", message).await {
Ok(receipt) => {
println!("Message published successfully! ID: {}", receipt.message_id());
}
Err(e) => {
eprintln!("Failed to publish message: {}", e);
}
}
client.close().await?;
Ok(())
}
Consuming Messages
You can consume messages from a Queue or a Stream by calling consume
and passing an async
message handler closure. The consume
method will run the handler for each received message. It's often run as a background task.
Consuming from a Queue
When consuming from a queue, you must explicitly ack()
each message after successful processing.
use flowmq_sdk::{FlowClient, ClientConfig, Authentication, Queue};
use std::error::Error;
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfig::builder()
.broker_address("localhost:7070")
.authentication(Authentication::token("YOUR_API_TOKEN"))
.build();
let client = FlowClient::new(config).await?;
let task_queue = Queue::new("image.processing.tasks");
println!("Starting consumer on queue: {}...", task_queue.name());
// The consume method takes an async closure as a handler
let consume_future = client.consume(task_queue, |message| async move {
println!("Received from queue: {}", String::from_utf8_lossy(message.payload()));
// Process message...
println!("Processing complete. Acknowledging message.");
if let Err(e) = message.ack().await {
eprintln!("Failed to ack message: {}", e);
}
});
// Run the consumer until Ctrl-C is pressed
tokio::select! {
res = consume_future => {
if let Err(e) = res {
eprintln!("Consumer error: {}", e);
}
},
_ = signal::ctrl_c() => {
println!("Consumer stopped by user.");
}
}
client.close().await?;
Ok(())
}
Consuming from a Stream
When consuming from a stream, you commit offsets to track progress.
use flowmq_sdk::{FlowClient, ClientConfig, Authentication, Stream, StartPosition};
use std::error::Error;
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfig::builder()
.broker_address("localhost:7070")
.authentication(Authentication::token("YOUR_API_TOKEN"))
.build();
let client = FlowClient::new(config).await?;
let event_stream = Stream::new("user.activity.events", StartPosition::Earliest);
println!("Starting consumer on stream: {}...", event_stream.name());
let consume_future = client.consume(event_stream, |message| async move {
println!("Received from stream (offset: {}): {}", message.offset(), String::from_utf8_lossy(message.payload()));
// Process message...
println!("Processing complete. Committing offset.");
if let Err(e) = message.commit_offset().await {
eprintln!("Failed to commit offset: {}", e);
}
});
tokio::select! {
res = consume_future => {
if let Err(e) = res {
eprintln!("Consumer error: {}", e);
}
},
_ = signal::ctrl_c() => {
println!("Consumer stopped by user.");
}
}
client.close().await?;
Ok(())
}
Full End-to-End Example
This example combines publishing and consuming using tokio::spawn
to run the consumer as a background task.
use flowmq_sdk::{
Authentication, ClientConfig, FlowClient, Message, Queue,
};
use std::error::Error;
use std::sync::Arc;
use tokio::sync::Notify;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfig::builder()
.broker_address("localhost:7070")
.authentication(Authentication::token("YOUR_API_TOKEN"))
.build();
// Use an Arc to share the client between tasks
let client = Arc::new(FlowClient::new(config).await?);
// 1. Consumer Setup (run in background)
// (Assumes 'e2e.topic' is routed to 'e2e.queue' in FlowMQ)
let consumer_client = client.clone();
let message_received_notify = Arc::new(Notify::new());
let notify_clone = message_received_notify.clone();
let consumer_handle = tokio::spawn(async move {
let e2e_queue = Queue::new("e2e.queue");
let result = consumer_client.consume(e2e_queue, move |message| {
let notify = notify_clone.clone();
async move {
println!("Received message: {}", String::from_utf8_lossy(message.payload()));
message.ack().await.unwrap();
notify.notify_one(); // Signal that the message was received
}
}).await;
if let Err(e) = result {
eprintln!("Consumer exited with error: {}", e);
}
});
println!("Consumer started and waiting for messages...");
// 2. Producer: Publish a message
let producer = client.new_producer();
let message = Message::builder().payload(b"End-to-End Test Message!").build();
producer.publish("e2e.topic", message).await?;
println!("Message published successfully.");
// 3. Wait for the consumer to receive the message
message_received_notify.notified().await;
println!("Successfully received and acknowledged message.");
// Cleanly shut down
consumer_handle.abort();
client.close().await?;
Ok(())
}
Additional Resources
- Error Handling: Embrace Rust's
Result
type. Use?
for concise error propagation andmatch
orif let
for granular error handling. - Concurrency: Use
Arc
to share clients and other state safely acrosstokio
tasks. Use channels orNotify
for task communication. - Advanced Configuration: Explore the
ClientConfig::builder()
for more options like timeouts and TLS. - Flow SDK Overview: For a broader understanding of the SDK's architecture, refer to the Flow SDK Developer Guide.