Connect FlowMQ with AMQP Python SDK
FlowMQ natively supports the AMQP 0-9-1 protocol, which means you can use any compliant client library to interact with it. This guide will walk you through using Python with the popular pika
library to connect to FlowMQ, declare exchanges and queues, and perform basic message publishing and consuming.
Prerequisites
Before proceeding, make sure you have the following set up:
- A running instance of FlowMQ.
- Python 3.6 or later installed on your system.
- The
pika
library for Python.
Installation
If you don't have the pika
library installed, you can add it to your project using pip:
pip install pika
Connecting to FlowMQ
The first step is to establish a connection with the FlowMQ broker. A BlockingConnection
is a straightforward way to get started.
import pika
# Establish a connection to the FlowMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
Declaring an Exchange and a Queue
In AMQP, messages are published to exchanges, which then route them to bound queues. Here's how you can declare a direct
exchange and a server-named queue.
# Declare a direct exchange
channel.exchange_declare(exchange='my_exchange', exchange_type='direct')
# Declare a queue with a random name
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
Binding the Queue to the Exchange
To receive messages, you need to bind your queue to the exchange with a specific routing_key
.
# Bind the queue to the exchange
channel.queue_bind(exchange='my_exchange', queue=queue_name, routing_key='my_key')
Publishing a Message
Now you can publish a message to the exchange with the specified routing key. The exchange will deliver it to all queues bound with that key.
# Publish the message
channel.basic_publish(
exchange='my_exchange',
routing_key='my_key',
body='Hello FlowMQ with AMQP!'
)
print(" [x] Sent 'Hello FlowMQ with AMQP!'")
Consuming Messages
To consume messages, you can define a callback function that will process incoming messages and start a consumer.
Callback Function
This function is executed whenever a message is received from the queue.
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
Starting the Consumer
# Set up the consumer
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Full Examples
Here are two separate scripts for a publisher and a consumer to demonstrate a complete workflow.
Publisher (publisher.py
)
import pika
# Connection
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare exchange
channel.exchange_declare(exchange='my_exchange', exchange_type='direct')
# Publish message
channel.basic_publish(
exchange='my_exchange',
routing_key='my_key',
body='Hello FlowMQ with AMQP!'
)
print(" [x] Sent 'Hello FlowMQ with AMQP!'")
# Close connection
connection.close()
Consumer (consumer.py
)
import pika
# Connection
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare exchange
channel.exchange_declare(exchange='my_exchange', exchange_type='direct')
# Declare queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind queue
channel.queue_bind(exchange='my_exchange', queue=queue_name, routing_key='my_key')
print(' [*] Waiting for messages. To exit press CTRL+C')
# Callback for message consumption
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# Start consuming
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
Additional Resources
- Dive deeper into the
pika
library's features in the official pika documentation. - Explore FlowMQ's more advanced AMQP capabilities.