Skip to content

AMQP Protocol

FlowMQ provides native AMQP 0-9-1 support, allowing you to use standard AMQP clients and libraries to connect, publish, and consume messages. AMQP is ideal for enterprise messaging, task queuing, and reliable message delivery.

Overview

AMQP in FlowMQ maps to the unified routing model where:

  • AMQP Exchanges map to Topics
  • AMQP Queues map to Queues
  • AMQP Bindings map to Topic Filters

Connection

Connect to FlowMQ using standard AMQP 0-9-1 clients:

python
import pika

# Connect to FlowMQ
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='your-namespace.flowmq.io',
        port=5672,
        credentials=pika.PlainCredentials('username', 'password')
    )
)
channel = connection.channel()

Publishing Messages

Direct Exchange

Publish messages to a direct exchange with routing keys:

python
# Declare a direct exchange
channel.exchange_declare(exchange='orders', exchange_type='direct', durable=True)

# Publish a message
channel.basic_publish(
    exchange='orders',
    routing_key='eu.new',
    body='{"order_id": "12345", "amount": 99.99}',
    properties=pika.BasicProperties(
        delivery_mode=2,  # Persistent message
        content_type='application/json'
    )
)

Topic Exchange

Use topic exchanges for pattern-based routing:

python
# Declare a topic exchange
channel.exchange_declare(exchange='sensors', exchange_type='topic', durable=True)

# Publish with routing key
channel.basic_publish(
    exchange='sensors',
    routing_key='temperature.us.california',
    body='{"value": 22.5, "unit": "celsius"}'
)

Fanout Exchange

Broadcast messages to all bound queues:

python
# Declare a fanout exchange
channel.exchange_declare(exchange='notifications', exchange_type='fanout', durable=True)

# Publish to all subscribers
channel.basic_publish(
    exchange='notifications',
    routing_key='',  # Ignored for fanout
    body='{"message": "System maintenance scheduled"}'
)

Consuming Messages

Declare and Bind Queue

python
# Declare a queue
channel.queue_declare(queue='order-processing', durable=True)

# Bind to exchange with routing key
channel.queue_bind(
    exchange='orders',
    queue='order-processing',
    routing_key='eu.new'
)

Consume Messages

python
def callback(ch, method, properties, body):
    print(f"Received: {body}")
    # Process the message
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Start consuming
channel.basic_consume(
    queue='order-processing',
    on_message_callback=callback,
    auto_ack=False  # Manual acknowledgment
)

print("Waiting for messages...")
channel.start_consuming()

Topic Filter Mapping

FlowMQ automatically maps AMQP routing keys to MQTT-style topic filters:

AMQP Routing KeyMapped Topic Filter
orders.eu.neworders/eu/new
sensors.temperaturesensors/temperature
logs.errorlogs/error

Wildcard Support

AMQP topic exchanges support wildcards that map to MQTT patterns:

AMQP Binding KeyMapped Topic Filter
orders.*.neworders/+/new
sensors.#sensors/#
logs.error.*logs/error/+

Interoperability

AMQP → MQTT

Messages published via AMQP can be consumed by MQTT clients:

python
# AMQP Producer
channel.basic_publish(
    exchange='sensors',
    routing_key='temperature.us.california',
    body='{"value": 22.5}'
)
bash
# MQTT Consumer
mqttx sub -h your-namespace.flowmq.io -t "sensors/temperature/us/california"

MQTT → AMQP

Messages published via MQTT can be consumed by AMQP clients:

bash
# MQTT Producer
mqttx pub -h your-namespace.flowmq.io -t "orders/eu/new" -m '{"order_id": "12345"}'
python
# AMQP Consumer
channel.queue_bind(
    exchange='orders',
    queue='order-processing',
    routing_key='eu.new'
)

Default Exchanges

FlowMQ provides standard AMQP default exchanges:

  • amq.direct - Direct routing by routing key
  • amq.topic - Topic-based routing with wildcards
  • amq.fanout - Broadcast to all bound queues

Default Queue Binding

Queues are automatically bound to amq.direct with the queue name as routing key:

python
# This queue is automatically bound to amq.direct with routing key 'my-queue'
channel.queue_declare(queue='my-queue', durable=True)

# Publish directly to queue
channel.basic_publish(
    exchange='',  # Default exchange
    routing_key='my-queue',
    body='Direct message to queue'
)

Message Properties

AMQP supports rich message properties:

python
properties = pika.BasicProperties(
    delivery_mode=2,  # Persistent
    content_type='application/json',
    content_encoding='utf-8',
    headers={
        'user_id': '12345',
        'priority': 'high'
    },
    priority=5,
    correlation_id='corr-123',
    reply_to='reply-queue',
    expiration='60000',  # TTL in milliseconds
    message_id='msg-456',
    timestamp=int(time.time()),
    type='order.created',
    user_id='user123',
    app_id='order-service'
)

channel.basic_publish(
    exchange='orders',
    routing_key='eu.new',
    body=json.dumps(order_data),
    properties=properties
)

Error Handling

python
try:
    # Declare queue
    channel.queue_declare(queue='my-queue', durable=True)
    
    # Publish message
    channel.basic_publish(
        exchange='orders',
        routing_key='eu.new',
        body='Message content'
    )
    
    # Confirm delivery
    channel.confirm_delivery()
    
except pika.exceptions.AMQPError as e:
    print(f"AMQP Error: {e}")
except pika.exceptions.ChannelError as e:
    print(f"Channel Error: {e}")
except pika.exceptions.ConnectionError as e:
    print(f"Connection Error: {e}")

Best Practices

  1. Use Durable Queues and Exchanges for message persistence
  2. Enable Publisher Confirms for reliable publishing
  3. Use Manual Acknowledgments for guaranteed delivery
  4. Set Appropriate TTL for message expiration
  5. Use Correlation IDs for request-reply patterns
  6. Implement Dead Letter Queues for failed messages

Client Libraries

FlowMQ works with any standard AMQP 0-9-1 client library:

  • Python: pika, aio-pika
  • Java: amqp-client, spring-amqp
  • Node.js: amqplib
  • Go: streadway/amqp
  • C#: RabbitMQ.Client
  • Ruby: bunny

Connection Parameters

ParameterDefaultDescription
Hostyour-namespace.flowmq.ioFlowMQ service host
Port5672AMQP port
Virtual Host/Namespace (if applicable)
Heartbeat60sConnection heartbeat
Frame Max131072Maximum frame size

Next Steps