Connect FlowMQ with the Go SDK
This guide provides a comprehensive walkthrough for connecting your Go applications to FlowMQ using the official Flow SDK. The Go bindings are designed to be idiomatic, leveraging goroutines, channels, and context for concurrent and robust messaging.
Prerequisites
- Go 1.18+: The SDK uses modern Go features and modules.
- Running FlowMQ Instance: You need access to a FlowMQ broker. For this guide, we'll assume it's running at
localhost:7070
.
Installation
The Go Flow SDK can be added to your project using go get
:
go get github.com/flowmq/flowmq-sdk-go
Then, import it into your Go files:
import "github.com/flowmq/flowmq-sdk-go"
Core Concepts
FlowClient
: The main client for interacting with FlowMQ. It manages the underlying gRPC connection and is safe for concurrent use.ClientConfig
: A struct used to configure theFlowClient
.Authentication
: An interface for providing credentials. UseNewTokenAuth
for token-based authentication.Producer
: An object created from theFlowClient
for publishing messages.Consumer
: An object responsible for receiving messages.Message
: A struct representing a message, containing aPayload
([]byte
) andHeaders
(map[string]string
).Queue
&Stream
: Structs that define the source you want to consume from.context.Context
: Used extensively for managing deadlines, cancellations, and request-scoped values.
Establishing a Connection
You create a FlowClient
by passing a ClientConfig
struct to flowmq.NewClient
. It's essential to Close()
the client when your application is done to release resources.
Configuration and Creation
package main
import (
"context"
"fmt"
"github.com/flowmq/flowmq-sdk-go"
)
func main() {
config := flowmq.ClientConfig{
BrokerAddress: "localhost:7070",
Authentication: flowmq.NewTokenAuth("YOUR_API_TOKEN"),
}
client, err := flowmq.NewClient(context.Background(), config)
if err != nil {
panic(fmt.Sprintf("Failed to create client: %v", err))
}
defer client.Close()
fmt.Println("Successfully connected to FlowMQ!")
// Your application logic here
}
Replace YOUR_API_TOKEN
with your actual credentials.
Publishing Messages
To publish messages, create a Producer
from the client. The Publish
method is an asynchronous operation. You can provide a callback to handle the result.
Example: Publishing a Message to a Topic
package main
import (
"context"
"fmt"
"sync"
"github.com/flowmq/flowmq-sdk-go"
)
func main() {
config := flowmq.ClientConfig{
BrokerAddress: "localhost:7070",
Authentication: flowmq.NewTokenAuth("YOUR_API_TOKEN"),
}
client, err := flowmq.NewClient(context.Background(), config)
if err != nil {
panic(err)
}
defer client.Close()
producer, err := client.NewProducer()
if err != nil {
panic(err)
}
message := &flowmq.Message{
Payload: []byte("Hello from Go!"),
Headers: map[string]string{"content-type": "text/plain"},
}
// Use a WaitGroup to wait for the async publish to complete
var wg sync.WaitGroup
wg.Add(1)
// PublishAsync sends the message and invokes the callback on completion
producer.PublishAsync("sensor.v1.data", message, func(receipt *flowmq.PublishReceipt, err error) {
defer wg.Done()
if err != nil {
fmt.Printf("Failed to publish message: %v\n", err)
return
}
fmt.Printf("Message published successfully! ID: %s\n", receipt.MessageID)
})
wg.Wait()
}
Consuming Messages
You can consume messages from a Queue or a Stream by calling Consume
and passing a MessageHandler
function. The Consume
method blocks and runs the handler in a dedicated goroutine until the provided context
is canceled.
Consuming from a Queue
When consuming from a queue, you must explicitly Ack()
each message after successful processing.
package main
import (
"context"
"fmt"
"os/signal"
"syscall"
"github.com/flowmq/flowmq-sdk-go"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
config := flowmq.ClientConfig{
BrokerAddress: "localhost:7070",
Authentication: flowmq.NewTokenAuth("YOUR_API_TOKEN"),
}
client, err := flowmq.NewClient(ctx, config)
if err != nil {
panic(err)
}
defer client.Close()
taskQueue := flowmq.NewQueue("image.processing.tasks")
handler := func(ctx context.Context, msg *flowmq.Message) error {
fmt.Printf("Received from queue: %s\n", string(msg.Payload))
// Process message...
fmt.Println("Processing complete. Acknowledging message.")
return msg.Ack(ctx)
}
fmt.Printf("Starting consumer on queue: %s...\n", taskQueue.Name)
if err := client.Consume(ctx, taskQueue, handler); err != nil {
fmt.Printf("Consumer error: %v\n", err)
}
fmt.Println("Consumer stopped.")
}
Consuming from a Stream
When consuming from a stream, you commit offsets to track progress.
package main
import (
"context"
"fmt"
"os/signal"
"syscall"
"github.com/flowmq/flowmq-sdk-go"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
config := flowmq.ClientConfig{
BrokerAddress: "localhost:7070",
Authentication: flowmq.NewTokenAuth("YOUR_API_TOKEN"),
}
client, err := flowmq.NewClient(ctx, config)
if err != nil {
panic(err)
}
defer client.Close()
eventStream := flowmq.NewStream("user.activity.events", flowmq.StartPositionEarliest)
handler := func(ctx context.Context, msg *flowmq.Message) error {
fmt.Printf("Received from stream (offset: %d): %s\n", msg.Offset, string(msg.Payload))
// Process message...
fmt.Println("Processing complete. Committing offset.")
return msg.CommitOffset(ctx)
}
fmt.Printf("Starting consumer on stream: %s...\n", eventStream.Name)
if err := client.Consume(ctx, eventStream, handler); err != nil {
fmt.Printf("Consumer error: %v\n", err)
}
fmt.Println("Consumer stopped.")
}
Full End-to-End Example
This example combines publishing and consuming to demonstrate a complete workflow.
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/flowmq/flowmq-sdk-go"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
config := flowmq.ClientConfig{
BrokerAddress: "localhost:7070",
Authentication: flowmq.NewTokenAuth("YOUR_API_TOKEN"),
}
client, err := flowmq.NewClient(ctx, config)
if err != nil {
panic(err)
}
defer client.Close()
var wg sync.WaitGroup
wg.Add(1)
// 1. Consumer Setup (run in a goroutine)
// (Assumes 'e2e.topic' is routed to 'e2e.queue' in FlowMQ)
go func() {
e2eQueue := flowmq.NewQueue("e2e.queue")
handler := func(ctx context.Context, msg *flowmq.Message) error {
fmt.Printf("Received message: %s\n", string(msg.Payload))
err := msg.Ack(ctx)
wg.Done() // Signal that the message has been received and processed
return err
}
fmt.Println("Consumer started and waiting for messages...")
if err := client.Consume(ctx, e2eQueue, handler); err != nil {
fmt.Printf("Consumer exited with error: %v\n", err)
}
}()
// Give the consumer a moment to start up
time.Sleep(1 * time.Second)
// 2. Producer: Publish a message
producer, err := client.NewProducer()
if err != nil {
panic(err)
}
message := &flowmq.Message{Payload: []byte("End-to-End Test Message!")}
producer.PublishAsync("e2e.topic", message, func(receipt *flowmq.PublishReceipt, err error){
if err != nil {
fmt.Printf("Publish failed: %v\n", err)
wg.Done() // Unblock if publish fails
} else {
fmt.Printf("Message published successfully. ID: %s\n", receipt.MessageID)
}
})
// 3. Wait for the consumer to receive the message or timeout
wg.Wait()
fmt.Println("Example finished.")
}
Additional Resources
- Error Handling: Check errors returned from all functions. The
Consume
method returns an error if the context is canceled or a non-recoverable error occurs. - Context Management: Practice using
context
to control the lifecycle of your consumers and other operations. - Advanced Configuration: Explore the
ClientConfig
for more options like timeouts and TLS. - Flow SDK Overview: For a broader understanding of the SDK's architecture and concepts, refer to the Flow SDK Developer Guide.