Connect FlowMQ with AMQP Elixir Client
FlowMQ's native support for AMQP 0.9.1 makes it a perfect fit for building scalable, fault-tolerant applications with Elixir. This guide will walk you through using amqp
, a popular Elixir client, to connect to FlowMQ, declare queues, publish messages, and consume them.
Prerequisites
Before you start, ensure you have the following:
- A running instance of FlowMQ.
- Elixir 1.10 or later installed.
- Mix, the Elixir build tool.
Installation
Add amqp
as a dependency to your project's mix.exs
file.
def deps do
[
{:amqp, "~> 3.0"}
]
end
Then, run mix deps.get
from your terminal to fetch and install the dependency.
Connecting to FlowMQ
First, establish a connection to your FlowMQ broker and open a channel. All AMQP operations are performed on a channel.
# Start an IEx session with your project's dependencies
# iex -S mix
{:ok, conn} = AMQP.Connection.open("amqp://guest:guest@localhost:5672")
{:ok, chan} = AMQP.Channel.open(conn)
Declaring a Queue
For simplicity, we'll declare a queue and publish messages to it using the default exchange.
queue_name = "hello_elixir"
AMQP.Queue.declare(chan, queue_name)
Publishing a Message
Now, you can publish a message to the default exchange. The routing key must be the name of the queue for the message to be delivered correctly.
payload = "Hello FlowMQ from Elixir!"
AMQP.Basic.publish(chan, "", queue_name, payload)
IO.puts " [x] Sent '#{payload}'"
Subscribing and Consuming Messages
To receive messages, you can subscribe to a queue. The handle_message
function will be called for each message that arrives.
defmodule MyConsumer do
use AMQP
def handle_message(payload, meta) do
IO.puts " [x] Received #{payload}"
# Acknowledge the message
AMQP.Basic.ack(meta.channel, meta.delivery_tag)
end
end
# In IEx:
{:ok, _consumer_tag} = AMQP.Basic.subscribe(chan, queue_name, &MyConsumer.handle_message/2)
# Keep the IEx session running to receive messages.
Full Example
Here is a complete Elixir module that demonstrates how to connect, publish, and consume a message.
defmodule FlowMQ.AMQP.Example do
use AMQP
def run do
# Configuration
queue_name = "elixir.example.queue"
# Connect and open channel
{:ok, conn} = Connection.open("amqp://guest:guest@localhost:5672")
{:ok, chan} = Channel.open(conn)
IO.puts "Connected to FlowMQ!"
# Declare queue
Queue.declare(chan, queue_name, durable: true)
# Register a consumer
parent = self()
Basic.consume(chan, queue_name, nil, no_ack: false)
IO.puts " [*] Waiting for messages. To exit press CTRL+C twice"
# Publish a test message
Basic.publish(chan, "", queue_name, "Hello from Elixir!")
IO.puts " [x] Sent 'Hello from Elixir!'"
# Keep the process alive to receive messages
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received #{payload}"
Basic.ack(chan, meta.delivery_tag)
# We stop after one message for this example
:ok
after
5_000 -> IO.puts "No message received within 5 seconds."
end
# Clean up
Channel.close(chan)
Connection.close(conn)
end
end
# To run this, save it to a file, e.g., `example.ex`, and run `elixir example.ex`
# or run it within an IEx session: FlowMQ.AMQP.Example.run()
Additional Resources
- For more advanced features, error handling, and examples, refer to the official AMQP Elixir documentation.
- Explore FlowMQ's advanced features for AMQP integration.