Connect FlowMQ with Kafka Go SDK
FlowMQ's native support for the Apache Kafka protocol allows developers to use a wide variety of Kafka client libraries. This guide provides a walkthrough of using the Confluent Kafka Go client to produce and consume messages from FlowMQ.
Prerequisites
Before you begin, ensure you have the following:
- A running instance of FlowMQ.
- Go 1.16 or later installed on your system.
Installation
You can add the Confluent Kafka Go client to your project using go get
.
go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka
Producer: Connecting and Sending Messages
A producer sends records to a topic in FlowMQ.
Configuration and Connection
First, create a producer instance with the necessary configuration, including the bootstrap.servers
for your FlowMQ instance.
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
panic(err)
}
defer p.Close()
// ...
}
Producing a Message
To send a message, you need to specify the topic and the message payload. The library handles partitioning automatically unless specified otherwise. You should also handle delivery reports to ensure messages are sent successfully.
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "my_topic"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "examples"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
Consumer: Connecting and Receiving Messages
A consumer subscribes to topics and processes the records sent to them.
Configuration and Connection
Configure the consumer by specifying the bootstrap.servers
and a group.id
. The auto.offset.reset
property tells the consumer where to begin reading if it has no committed offset.
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "my_consumer_group",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
defer c.Close()
// ...
}
Subscribing and Consuming
After creating a consumer, subscribe it to a topic and enter a loop to poll for new messages.
topic := "my_topic"
c.SubscribeTopics([]string{topic}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
Additional Resources
- For more detailed information, check out the official Confluent Kafka Go client repository.
- Explore FlowMQ's advanced features for Kafka integration.