Skip to content

Connect FlowMQ with Kafka Go SDK

FlowMQ's native support for the Apache Kafka protocol allows developers to use a wide variety of Kafka client libraries. This guide provides a walkthrough of using the Confluent Kafka Go client to produce and consume messages from FlowMQ.

Prerequisites

Before you begin, ensure you have the following:

  • A running instance of FlowMQ.
  • Go 1.16 or later installed on your system.

Installation

You can add the Confluent Kafka Go client to your project using go get.

bash
go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka

Producer: Connecting and Sending Messages

A producer sends records to a topic in FlowMQ.

Configuration and Connection

First, create a producer instance with the necessary configuration, including the bootstrap.servers for your FlowMQ instance.

go
import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        panic(err)
    }
    defer p.Close()

    // ...
}

Producing a Message

To send a message, you need to specify the topic and the message payload. The library handles partitioning automatically unless specified otherwise. You should also handle delivery reports to ensure messages are sent successfully.

go
    // Delivery report handler for produced messages
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    // Produce messages to topic (asynchronously)
    topic := "my_topic"
    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "examples"} {
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

    // Wait for message deliveries before shutting down
    p.Flush(15 * 1000)

Consumer: Connecting and Receiving Messages

A consumer subscribes to topics and processes the records sent to them.

Configuration and Connection

Configure the consumer by specifying the bootstrap.servers and a group.id. The auto.offset.reset property tells the consumer where to begin reading if it has no committed offset.

go
import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "group.id":          "my_consumer_group",
        "auto.offset.reset": "earliest",
    })
    if err != nil {
        panic(err)
    }
    defer c.Close()

    // ...
}

Subscribing and Consuming

After creating a consumer, subscribe it to a topic and enter a loop to poll for new messages.

go
    topic := "my_topic"
    c.SubscribeTopics([]string{topic}, nil)

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
        } else {
            // The client will automatically try to recover from all errors.
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }

Additional Resources