Connect FlowMQ with AMQP Go Client
FlowMQ's native support for the AMQP 0.9.1 protocol allows Go developers to build robust, message-driven applications. This guide will walk you through using the official rabbitmq/amqp091-go
client to connect to FlowMQ, declare exchanges and queues, publish messages, and consume them.
Prerequisites
Before you start, make sure you have the following:
- A running instance of FlowMQ.
- Go 1.16 or later installed.
Installation
Add the AMQP client library to your project using go get
.
go get github.com/rabbitmq/amqp091-go
Connecting to FlowMQ
First, establish a connection to your FlowMQ instance and open a channel. All operations will be performed on this channel.
package main
import (
"log"
"github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to FlowMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// ... declare, publish, and consume logic here ...
}
Declaring an Exchange and Queue
Messages in AMQP are published to exchanges, which then route them to bound queues. Here's how to declare a direct
exchange and a queue.
err = ch.ExchangeDeclare(
"my_exchange", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"my_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
Binding the Queue
To receive messages, you must bind your queue to the exchange with a specific routing_key
.
err = ch.QueueBind(
q.Name, // queue name
"my_key", // routing key
"my_exchange", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
Publishing a Message
Now, you can publish a message to the exchange. It will be routed to the queue we just bound.
body := "Hello FlowMQ from Go!"
err = ch.Publish(
"my_exchange", // exchange
"my_key", // routing key
false, // mandatory
false, // immediate
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
Consuming Messages
To consume messages, register a consumer on the queue. The messages will be delivered to a Go channel.
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
Additional Resources
- For more advanced features and examples, refer to the official rabbitmq/amqp091-go repository.
- Explore FlowMQ's advanced features for AMQP integration.