Skip to content

Connect FlowMQ with AMQP Elixir Client

FlowMQ's native support for AMQP 0.9.1 makes it a perfect fit for building scalable, fault-tolerant applications with Elixir. This guide will walk you through using amqp, a popular Elixir client, to connect to FlowMQ, declare queues, publish messages, and consume them.

Prerequisites

Before you start, ensure you have the following:

  • A running instance of FlowMQ.
  • Elixir 1.10 or later installed.
  • Mix, the Elixir build tool.

Installation

Add amqp as a dependency to your project's mix.exs file.

elixir
def deps do
  [
    {:amqp, "~> 3.0"}
  ]
end

Then, run mix deps.get from your terminal to fetch and install the dependency.

Connecting to FlowMQ

First, establish a connection to your FlowMQ broker and open a channel. All AMQP operations are performed on a channel.

elixir
# Start an IEx session with your project's dependencies
# iex -S mix

{:ok, conn} = AMQP.Connection.open("amqp://guest:guest@localhost:5672")
{:ok, chan} = AMQP.Channel.open(conn)

Declaring a Queue

For simplicity, we'll declare a queue and publish messages to it using the default exchange.

elixir
queue_name = "hello_elixir"
AMQP.Queue.declare(chan, queue_name)

Publishing a Message

Now, you can publish a message to the default exchange. The routing key must be the name of the queue for the message to be delivered correctly.

elixir
payload = "Hello FlowMQ from Elixir!"
AMQP.Basic.publish(chan, "", queue_name, payload)

IO.puts " [x] Sent '#{payload}'"

Subscribing and Consuming Messages

To receive messages, you can subscribe to a queue. The handle_message function will be called for each message that arrives.

elixir
defmodule MyConsumer do
  use AMQP

  def handle_message(payload, meta) do
    IO.puts " [x] Received #{payload}"
    # Acknowledge the message
    AMQP.Basic.ack(meta.channel, meta.delivery_tag)
  end
end

# In IEx:
{:ok, _consumer_tag} = AMQP.Basic.subscribe(chan, queue_name, &MyConsumer.handle_message/2)

# Keep the IEx session running to receive messages.

Full Example

Here is a complete Elixir module that demonstrates how to connect, publish, and consume a message.

elixir
defmodule FlowMQ.AMQP.Example do
  use AMQP

  def run do
    # Configuration
    queue_name = "elixir.example.queue"

    # Connect and open channel
    {:ok, conn} = Connection.open("amqp://guest:guest@localhost:5672")
    {:ok, chan} = Channel.open(conn)
    IO.puts "Connected to FlowMQ!"

    # Declare queue
    Queue.declare(chan, queue_name, durable: true)

    # Register a consumer
    parent = self()
    Basic.consume(chan, queue_name, nil, no_ack: false)
    IO.puts " [*] Waiting for messages. To exit press CTRL+C twice"

    # Publish a test message
    Basic.publish(chan, "", queue_name, "Hello from Elixir!")
    IO.puts " [x] Sent 'Hello from Elixir!'"

    # Keep the process alive to receive messages
    receive do
      {:basic_deliver, payload, meta} ->
        IO.puts " [x] Received #{payload}"
        Basic.ack(chan, meta.delivery_tag)
        # We stop after one message for this example
        :ok
    after
      5_000 -> IO.puts "No message received within 5 seconds."
    end

    # Clean up
    Channel.close(chan)
    Connection.close(conn)
  end
end

# To run this, save it to a file, e.g., `example.ex`, and run `elixir example.ex`
# or run it within an IEx session: FlowMQ.AMQP.Example.run()

Additional Resources

  • For more advanced features, error handling, and examples, refer to the official AMQP Elixir documentation.
  • Explore FlowMQ's advanced features for AMQP integration.