AMQP Protocol
FlowMQ provides native AMQP 0-9-1 support, allowing you to use standard AMQP clients and libraries to connect, publish, and consume messages. AMQP is ideal for enterprise messaging, task queuing, and reliable message delivery.
Overview
AMQP in FlowMQ maps to the unified routing model where:
- AMQP Exchanges map to Topics
- AMQP Queues map to Queues
- AMQP Bindings map to Topic Filters
Connection
Connect to FlowMQ using standard AMQP 0-9-1 clients:
import pika
# Connect to FlowMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='your-namespace.flowmq.io',
port=5672,
credentials=pika.PlainCredentials('username', 'password')
)
)
channel = connection.channel()Publishing Messages
Direct Exchange
Publish messages to a direct exchange with routing keys:
# Declare a direct exchange
channel.exchange_declare(exchange='orders', exchange_type='direct', durable=True)
# Publish a message
channel.basic_publish(
exchange='orders',
routing_key='eu.new',
body='{"order_id": "12345", "amount": 99.99}',
properties=pika.BasicProperties(
delivery_mode=2, # Persistent message
content_type='application/json'
)
)Topic Exchange
Use topic exchanges for pattern-based routing:
# Declare a topic exchange
channel.exchange_declare(exchange='sensors', exchange_type='topic', durable=True)
# Publish with routing key
channel.basic_publish(
exchange='sensors',
routing_key='temperature.us.california',
body='{"value": 22.5, "unit": "celsius"}'
)Fanout Exchange
Broadcast messages to all bound queues:
# Declare a fanout exchange
channel.exchange_declare(exchange='notifications', exchange_type='fanout', durable=True)
# Publish to all subscribers
channel.basic_publish(
exchange='notifications',
routing_key='', # Ignored for fanout
body='{"message": "System maintenance scheduled"}'
)Consuming Messages
Declare and Bind Queue
# Declare a queue
channel.queue_declare(queue='order-processing', durable=True)
# Bind to exchange with routing key
channel.queue_bind(
exchange='orders',
queue='order-processing',
routing_key='eu.new'
)Consume Messages
def callback(ch, method, properties, body):
print(f"Received: {body}")
# Process the message
ch.basic_ack(delivery_tag=method.delivery_tag)
# Start consuming
channel.basic_consume(
queue='order-processing',
on_message_callback=callback,
auto_ack=False # Manual acknowledgment
)
print("Waiting for messages...")
channel.start_consuming()Topic Filter Mapping
FlowMQ automatically maps AMQP routing keys to MQTT-style topic filters:
| AMQP Routing Key | Mapped Topic Filter |
|---|---|
orders.eu.new | orders/eu/new |
sensors.temperature | sensors/temperature |
logs.error | logs/error |
Wildcard Support
AMQP topic exchanges support wildcards that map to MQTT patterns:
| AMQP Binding Key | Mapped Topic Filter |
|---|---|
orders.*.new | orders/+/new |
sensors.# | sensors/# |
logs.error.* | logs/error/+ |
Interoperability
AMQP → MQTT
Messages published via AMQP can be consumed by MQTT clients:
# AMQP Producer
channel.basic_publish(
exchange='sensors',
routing_key='temperature.us.california',
body='{"value": 22.5}'
)# MQTT Consumer
mqttx sub -h your-namespace.flowmq.io -t "sensors/temperature/us/california"MQTT → AMQP
Messages published via MQTT can be consumed by AMQP clients:
# MQTT Producer
mqttx pub -h your-namespace.flowmq.io -t "orders/eu/new" -m '{"order_id": "12345"}'# AMQP Consumer
channel.queue_bind(
exchange='orders',
queue='order-processing',
routing_key='eu.new'
)Default Exchanges
FlowMQ provides standard AMQP default exchanges:
- amq.direct - Direct routing by routing key
- amq.topic - Topic-based routing with wildcards
- amq.fanout - Broadcast to all bound queues
Default Queue Binding
Queues are automatically bound to amq.direct with the queue name as routing key:
# This queue is automatically bound to amq.direct with routing key 'my-queue'
channel.queue_declare(queue='my-queue', durable=True)
# Publish directly to queue
channel.basic_publish(
exchange='', # Default exchange
routing_key='my-queue',
body='Direct message to queue'
)Message Properties
AMQP supports rich message properties:
properties = pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json',
content_encoding='utf-8',
headers={
'user_id': '12345',
'priority': 'high'
},
priority=5,
correlation_id='corr-123',
reply_to='reply-queue',
expiration='60000', # TTL in milliseconds
message_id='msg-456',
timestamp=int(time.time()),
type='order.created',
user_id='user123',
app_id='order-service'
)
channel.basic_publish(
exchange='orders',
routing_key='eu.new',
body=json.dumps(order_data),
properties=properties
)Error Handling
try:
# Declare queue
channel.queue_declare(queue='my-queue', durable=True)
# Publish message
channel.basic_publish(
exchange='orders',
routing_key='eu.new',
body='Message content'
)
# Confirm delivery
channel.confirm_delivery()
except pika.exceptions.AMQPError as e:
print(f"AMQP Error: {e}")
except pika.exceptions.ChannelError as e:
print(f"Channel Error: {e}")
except pika.exceptions.ConnectionError as e:
print(f"Connection Error: {e}")Best Practices
- Use Durable Queues and Exchanges for message persistence
- Enable Publisher Confirms for reliable publishing
- Use Manual Acknowledgments for guaranteed delivery
- Set Appropriate TTL for message expiration
- Use Correlation IDs for request-reply patterns
- Implement Dead Letter Queues for failed messages
Client Libraries
FlowMQ works with any standard AMQP 0-9-1 client library:
- Python:
pika,aio-pika - Java:
amqp-client,spring-amqp - Node.js:
amqplib - Go:
streadway/amqp - C#:
RabbitMQ.Client - Ruby:
bunny
Connection Parameters
| Parameter | Default | Description |
|---|---|---|
| Host | your-namespace.flowmq.io | FlowMQ service host |
| Port | 5672 | AMQP port |
| Virtual Host | / | Namespace (if applicable) |
| Heartbeat | 60s | Connection heartbeat |
| Frame Max | 131072 | Maximum frame size |
Next Steps
- Learn about MQTT Protocol
- Explore Kafka Protocol
- Check SDK Documentation for language-specific guides