Skip to content

Connect FlowMQ with AMQP Go Client

FlowMQ's native support for the AMQP 0.9.1 protocol allows Go developers to build robust, message-driven applications. This guide will walk you through using the official rabbitmq/amqp091-go client to connect to FlowMQ, declare exchanges and queues, publish messages, and consume them.

Prerequisites

Before you start, make sure you have the following:

  • A running instance of FlowMQ.
  • Go 1.16 or later installed.

Installation

Add the AMQP client library to your project using go get.

bash
go get github.com/rabbitmq/amqp091-go

Connecting to FlowMQ

First, establish a connection to your FlowMQ instance and open a channel. All operations will be performed on this channel.

go
package main

import (
	"log"
	"github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to FlowMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// ... declare, publish, and consume logic here ...
}

Declaring an Exchange and Queue

Messages in AMQP are published to exchanges, which then route them to bound queues. Here's how to declare a direct exchange and a queue.

go
	err = ch.ExchangeDeclare(
		"my_exchange", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	q, err := ch.QueueDeclare(
		"my_queue", // name
		true,       // durable
		false,      // delete when unused
		false,      // exclusive
		false,      // no-wait
		nil,        // arguments
	)
	failOnError(err, "Failed to declare a queue")

Binding the Queue

To receive messages, you must bind your queue to the exchange with a specific routing_key.

go
	err = ch.QueueBind(
		q.Name,        // queue name
		"my_key",      // routing key
		"my_exchange", // exchange
		false,
		nil,
	)
	failOnError(err, "Failed to bind a queue")

Publishing a Message

Now, you can publish a message to the exchange. It will be routed to the queue we just bound.

go
	body := "Hello FlowMQ from Go!"
	err = ch.Publish(
		"my_exchange", // exchange
		"my_key",      // routing key
		false,         // mandatory
		false,         // immediate
		amqp091.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s", body)

Consuming Messages

To consume messages, register a consumer on the queue. The messages will be delivered to a Go channel.

go
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	var forever chan struct{}

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever

Additional Resources

  • For more advanced features and examples, refer to the official rabbitmq/amqp091-go repository.
  • Explore FlowMQ's advanced features for AMQP integration.