Skip to content

Connect FlowMQ with the Go SDK

This guide provides a comprehensive walkthrough for connecting your Go applications to FlowMQ using the official Flow SDK. The Go bindings are designed to be idiomatic, leveraging goroutines, channels, and context for concurrent and robust messaging.

Prerequisites

  • Go 1.18+: The SDK uses modern Go features and modules.
  • Running FlowMQ Instance: You need access to a FlowMQ broker. For this guide, we'll assume it's running at localhost:7070.

Installation

The Go Flow SDK can be added to your project using go get:

shell
go get github.com/flowmq/flowmq-sdk-go

Then, import it into your Go files:

go
import "github.com/flowmq/flowmq-sdk-go"

Core Concepts

  • FlowClient: The main client for interacting with FlowMQ. It manages the underlying gRPC connection and is safe for concurrent use.
  • ClientConfig: A struct used to configure the FlowClient.
  • Authentication: An interface for providing credentials. Use NewTokenAuth for token-based authentication.
  • Producer: An object created from the FlowClient for publishing messages.
  • Consumer: An object responsible for receiving messages.
  • Message: A struct representing a message, containing a Payload ([]byte) and Headers (map[string]string).
  • Queue & Stream: Structs that define the source you want to consume from.
  • context.Context: Used extensively for managing deadlines, cancellations, and request-scoped values.

Establishing a Connection

You create a FlowClient by passing a ClientConfig struct to flowmq.NewClient. It's essential to Close() the client when your application is done to release resources.

Configuration and Creation

go
package main

import (
	"context"
	"fmt"
	"github.com/flowmq/flowmq-sdk-go"
)

func main() {
	config := flowmq.ClientConfig{
		BrokerAddress: "localhost:7070",
		Authentication: flowmq.NewTokenAuth("YOUR_API_TOKEN"),
	}

	client, err := flowmq.NewClient(context.Background(), config)
	if err != nil {
		panic(fmt.Sprintf("Failed to create client: %v", err))
	}
	defer client.Close()

	fmt.Println("Successfully connected to FlowMQ!")
	// Your application logic here
}

Replace YOUR_API_TOKEN with your actual credentials.

Publishing Messages

To publish messages, create a Producer from the client. The Publish method is an asynchronous operation. You can provide a callback to handle the result.

Example: Publishing a Message to a Topic

go
package main

import (
	"context"
	"fmt"
	"sync"
	"github.com/flowmq/flowmq-sdk-go"
)

func main() {
	config := flowmq.ClientConfig{
		BrokerAddress: "localhost:7070",
		Authentication: flowmq.NewTokenAuth("YOUR_API_TOKEN"),
	}

	client, err := flowmq.NewClient(context.Background(), config)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	producer, err := client.NewProducer()
	if err != nil {
		panic(err)
	}

	message := &flowmq.Message{
		Payload: []byte("Hello from Go!"),
		Headers: map[string]string{"content-type": "text/plain"},
	}

	// Use a WaitGroup to wait for the async publish to complete
	var wg sync.WaitGroup
	wg.Add(1)

	// PublishAsync sends the message and invokes the callback on completion
	producer.PublishAsync("sensor.v1.data", message, func(receipt *flowmq.PublishReceipt, err error) {
		defer wg.Done()
		if err != nil {
			fmt.Printf("Failed to publish message: %v\n", err)
			return
		}
		fmt.Printf("Message published successfully! ID: %s\n", receipt.MessageID)
	})

	wg.Wait()
}

Consuming Messages

You can consume messages from a Queue or a Stream by calling Consume and passing a MessageHandler function. The Consume method blocks and runs the handler in a dedicated goroutine until the provided context is canceled.

Consuming from a Queue

When consuming from a queue, you must explicitly Ack() each message after successful processing.

go
package main

import (
	"context"
	"fmt"
	"os/signal"
	"syscall"
	"github.com/flowmq/flowmq-sdk-go"
)

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer cancel()

	config := flowmq.ClientConfig{
		BrokerAddress: "localhost:7070",
		Authentication: flowmq.NewTokenAuth("YOUR_API_TOKEN"),
	}

	client, err := flowmq.NewClient(ctx, config)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	taskQueue := flowmq.NewQueue("image.processing.tasks")

	handler := func(ctx context.Context, msg *flowmq.Message) error {
		fmt.Printf("Received from queue: %s\n", string(msg.Payload))
		// Process message...
		fmt.Println("Processing complete. Acknowledging message.")
		return msg.Ack(ctx)
	}
	
	fmt.Printf("Starting consumer on queue: %s...\n", taskQueue.Name)
	if err := client.Consume(ctx, taskQueue, handler); err != nil {
		fmt.Printf("Consumer error: %v\n", err)
	}
	
	fmt.Println("Consumer stopped.")
}

Consuming from a Stream

When consuming from a stream, you commit offsets to track progress.

go
package main

import (
	"context"
	"fmt"
	"os/signal"
	"syscall"
	"github.com/flowmq/flowmq-sdk-go"
)

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer cancel()

	config := flowmq.ClientConfig{
		BrokerAddress: "localhost:7070",
		Authentication: flowmq.NewTokenAuth("YOUR_API_TOKEN"),
	}
	client, err := flowmq.NewClient(ctx, config)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	eventStream := flowmq.NewStream("user.activity.events", flowmq.StartPositionEarliest)

	handler := func(ctx context.Context, msg *flowmq.Message) error {
		fmt.Printf("Received from stream (offset: %d): %s\n", msg.Offset, string(msg.Payload))
		// Process message...
		fmt.Println("Processing complete. Committing offset.")
		return msg.CommitOffset(ctx)
	}
	
	fmt.Printf("Starting consumer on stream: %s...\n", eventStream.Name)
	if err := client.Consume(ctx, eventStream, handler); err != nil {
		fmt.Printf("Consumer error: %v\n", err)
	}
	
	fmt.Println("Consumer stopped.")
}

Full End-to-End Example

This example combines publishing and consuming to demonstrate a complete workflow.

go
package main

import (
	"context"
	"fmt"
	"sync"
	"time"
	"github.com/flowmq/flowmq-sdk-go"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
	defer cancel()
	
	config := flowmq.ClientConfig{
		BrokerAddress: "localhost:7070",
		Authentication: flowmq.NewTokenAuth("YOUR_API_TOKEN"),
	}

	client, err := flowmq.NewClient(ctx, config)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	var wg sync.WaitGroup
	wg.Add(1)

	// 1. Consumer Setup (run in a goroutine)
	// (Assumes 'e2e.topic' is routed to 'e2e.queue' in FlowMQ)
	go func() {
		e2eQueue := flowmq.NewQueue("e2e.queue")
		handler := func(ctx context.Context, msg *flowmq.Message) error {
			fmt.Printf("Received message: %s\n", string(msg.Payload))
			err := msg.Ack(ctx)
			wg.Done() // Signal that the message has been received and processed
			return err
		}
		
		fmt.Println("Consumer started and waiting for messages...")
		if err := client.Consume(ctx, e2eQueue, handler); err != nil {
			fmt.Printf("Consumer exited with error: %v\n", err)
		}
	}()
	
	// Give the consumer a moment to start up
	time.Sleep(1 * time.Second)

	// 2. Producer: Publish a message
	producer, err := client.NewProducer()
	if err != nil {
		panic(err)
	}
	message := &flowmq.Message{Payload: []byte("End-to-End Test Message!")}
	
	producer.PublishAsync("e2e.topic", message, func(receipt *flowmq.PublishReceipt, err error){
		if err != nil {
			fmt.Printf("Publish failed: %v\n", err)
			wg.Done() // Unblock if publish fails
		} else {
			fmt.Printf("Message published successfully. ID: %s\n", receipt.MessageID)
		}
	})

	// 3. Wait for the consumer to receive the message or timeout
	wg.Wait()
	fmt.Println("Example finished.")
}

Additional Resources

  • Error Handling: Check errors returned from all functions. The Consume method returns an error if the context is canceled or a non-recoverable error occurs.
  • Context Management: Practice using context to control the lifecycle of your consumers and other operations.
  • Advanced Configuration: Explore the ClientConfig for more options like timeouts and TLS.
  • Flow SDK Overview: For a broader understanding of the SDK's architecture and concepts, refer to the Flow SDK Developer Guide.