Skip to content

Connect FlowMQ with AMQP Python SDK

FlowMQ natively supports the AMQP 0-9-1 protocol, which means you can use any compliant client library to interact with it. This guide will walk you through using Python with the popular pika library to connect to FlowMQ, declare exchanges and queues, and perform basic message publishing and consuming.

Prerequisites

Before proceeding, make sure you have the following set up:

  • A running instance of FlowMQ.
  • Python 3.6 or later installed on your system.
  • The pika library for Python.

Installation

If you don't have the pika library installed, you can add it to your project using pip:

bash
pip install pika

Connecting to FlowMQ

The first step is to establish a connection with the FlowMQ broker. A BlockingConnection is a straightforward way to get started.

python
import pika

# Establish a connection to the FlowMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

Declaring an Exchange and a Queue

In AMQP, messages are published to exchanges, which then route them to bound queues. Here's how you can declare a direct exchange and a server-named queue.

python
# Declare a direct exchange
channel.exchange_declare(exchange='my_exchange', exchange_type='direct')

# Declare a queue with a random name
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

Binding the Queue to the Exchange

To receive messages, you need to bind your queue to the exchange with a specific routing_key.

python
# Bind the queue to the exchange
channel.queue_bind(exchange='my_exchange', queue=queue_name, routing_key='my_key')

Publishing a Message

Now you can publish a message to the exchange with the specified routing key. The exchange will deliver it to all queues bound with that key.

python
# Publish the message
channel.basic_publish(
    exchange='my_exchange',
    routing_key='my_key',
    body='Hello FlowMQ with AMQP!'
)
print(" [x] Sent 'Hello FlowMQ with AMQP!'")

Consuming Messages

To consume messages, you can define a callback function that will process incoming messages and start a consumer.

Callback Function

This function is executed whenever a message is received from the queue.

python
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")

Starting the Consumer

python
# Set up the consumer
channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Full Examples

Here are two separate scripts for a publisher and a consumer to demonstrate a complete workflow.

Publisher (publisher.py)

python
import pika

# Connection
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange
channel.exchange_declare(exchange='my_exchange', exchange_type='direct')

# Publish message
channel.basic_publish(
    exchange='my_exchange',
    routing_key='my_key',
    body='Hello FlowMQ with AMQP!'
)
print(" [x] Sent 'Hello FlowMQ with AMQP!'")

# Close connection
connection.close()

Consumer (consumer.py)

python
import pika

# Connection
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange
channel.exchange_declare(exchange='my_exchange', exchange_type='direct')

# Declare queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Bind queue
channel.queue_bind(exchange='my_exchange', queue=queue_name, routing_key='my_key')

print(' [*] Waiting for messages. To exit press CTRL+C')

# Callback for message consumption
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")

# Start consuming
channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

Additional Resources

  • Dive deeper into the pika library's features in the official pika documentation.
  • Explore FlowMQ's more advanced AMQP capabilities.