MQTT Protocol
FlowMQ provides native MQTT v5.0 and v3.1.1 support, making it an ideal broker for IoT devices, mobile applications, and real-time messaging. MQTT's lightweight protocol and publish/subscribe model work seamlessly with FlowMQ's unified routing engine.
Overview
MQTT in FlowMQ maps directly to the unified routing model where:
- MQTT Topics map to Topics
- MQTT Subscriptions map to Subscriptions
- MQTT QoS Levels provide delivery guarantees
Connection
Connect to FlowMQ using any standard MQTT client:
import paho.mqtt.client as mqtt
# Create client
client = mqtt.Client(client_id="my-device-001")
# Set credentials
client.username_pw_set("username", "password")
# Connect to FlowMQ
client.connect("your-namespace.flowmq.io", 1883, 60)
client.loop_start()Publishing Messages
Basic Publishing
# Publish a simple message
client.publish("sensors/temperature", "22.5", qos=1)
# Publish JSON payload
import json
payload = {
"temperature": 22.5,
"humidity": 65,
"timestamp": "2024-01-15T10:30:00Z"
}
client.publish("sensors/room1", json.dumps(payload), qos=1)Publishing with Properties (MQTT v5.0)
# MQTT v5.0 properties
properties = {
"content_type": "application/json",
"user_property": ("device_id", "sensor-001"),
"message_expiry_interval": 3600, # 1 hour
"response_topic": "sensors/room1/response"
}
client.publish(
"sensors/room1/temperature",
json.dumps({"value": 22.5}),
qos=1,
properties=properties
)Batch Publishing
# Publish multiple messages efficiently
topics = [
("sensors/temperature", "22.5"),
("sensors/humidity", "65"),
("sensors/pressure", "1013.25")
]
for topic, payload in topics:
client.publish(topic, payload, qos=1)Subscribing to Topics
Basic Subscription
def on_message(client, userdata, msg):
print(f"Received: {msg.topic} -> {msg.payload.decode()}")
# Process the message
# Set callback
client.on_message = on_message
# Subscribe to specific topic
client.subscribe("sensors/temperature", qos=1)
# Subscribe to multiple topics
client.subscribe([
("sensors/temperature", 1),
("sensors/humidity", 1),
("alerts/#", 2)
])Wildcard Subscriptions
# Single-level wildcard
client.subscribe("sensors/+/temperature", qos=1)
# Matches: sensors/room1/temperature, sensors/room2/temperature
# Multi-level wildcard
client.subscribe("sensors/#", qos=1)
# Matches: sensors/temperature, sensors/room1/temperature, sensors/room1/humidity
# Multiple wildcards
client.subscribe("devices/+/+/status", qos=1)
# Matches: devices/sensor/001/status, devices/actuator/002/statusSubscription with Properties (MQTT v5.0)
# MQTT v5.0 subscription properties
subscription_properties = {
"user_property": ("client_type", "mobile-app"),
"subscription_identifier": 1
}
client.subscribe(
"sensors/#",
qos=1,
properties=subscription_properties
)QoS Levels
FlowMQ supports all MQTT QoS levels:
QoS 0 - At Most Once
# Fire and forget - no delivery guarantee
client.publish("logs/debug", "Debug message", qos=0)QoS 1 - At Least Once
# Guaranteed delivery with possible duplicates
client.publish("orders/new", order_data, qos=1)QoS 2 - Exactly Once
# Guaranteed exactly-once delivery
client.publish("payments/process", payment_data, qos=2)Topic Structure
FlowMQ uses hierarchical topic names with forward slashes:
sensors/room1/temperature
devices/iot/sensor001/status
orders/eu/new/books
alerts/critical/system/maintenanceBest Practices for Topic Design
- Use Hierarchical Structure:
domain/location/device/measurement - Be Consistent: Use the same pattern across your application
- Keep Topics Short: Avoid overly long topic names
- Use Descriptive Names: Make topics self-documenting
Interoperability
MQTT → AMQP
Messages published via MQTT can be consumed by AMQP clients:
# MQTT Publisher
client.publish("orders/eu/new", '{"order_id": "12345"}', qos=1)# AMQP Consumer
channel.queue_bind(
exchange='orders',
queue='order-processing',
routing_key='eu.new'
)MQTT → Kafka
MQTT messages can be consumed by Kafka clients:
# MQTT Publisher
client.publish("sensors/temperature", '{"value": 22.5}', qos=1)# Kafka Consumer
kafka-console-consumer.sh --bootstrap-server your-namespace.flowmq.io:9092 --topic sensors.temperatureAMQP → MQTT
AMQP messages can be consumed by MQTT clients:
# AMQP Publisher
channel.basic_publish(
exchange='sensors',
routing_key='temperature.us.california',
body='{"value": 22.5}'
)# MQTT Subscriber
client.subscribe("sensors/temperature/us/california", qos=1)Retained Messages
Publish messages that are retained for new subscribers:
# Publish retained message
client.publish(
"system/status",
'{"status": "online", "version": "1.2.3"}',
qos=1,
retain=True
)
# New subscribers will receive the retained message immediately
client.subscribe("system/status", qos=1)Last Will and Testament (LWT)
Configure messages to be sent when the client disconnects unexpectedly:
# Set LWT
client.will_set(
topic="devices/status",
payload='{"device_id": "sensor-001", "status": "offline"}',
qos=1,
retain=True
)
# Connect with LWT
client.connect("your-namespace.flowmq.io", 1883, 60)Message Properties (MQTT v5.0)
MQTT v5.0 supports rich message properties:
properties = {
"content_type": "application/json",
"content_format": "application/json",
"response_topic": "commands/response",
"correlation_data": b"req-123",
"message_expiry_interval": 3600,
"user_property": [
("device_id", "sensor-001"),
("priority", "high")
]
}
client.publish(
"commands/restart",
'{"action": "restart"}',
qos=1,
properties=properties
)Error Handling
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected successfully")
client.subscribe("sensors/#", qos=1)
else:
print(f"Connection failed with code {rc}")
def on_disconnect(client, userdata, rc):
if rc != 0:
print(f"Unexpected disconnection: {rc}")
# Implement reconnection logic
def on_publish(client, userdata, mid):
print(f"Message {mid} published successfully")
def on_subscribe(client, userdata, mid, granted_qos):
print(f"Subscribed with QoS: {granted_qos}")
# Set callbacks
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.on_subscribe = on_subscribeSecurity
TLS/SSL Connection
import ssl
# Enable TLS
client.tls_set(
ca_certs="ca-certificates.crt",
certfile="client-cert.pem",
keyfile="client-key.pem",
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2
)
# Connect with TLS
client.connect("your-namespace.flowmq.io", 8883, 60)Authentication
# Username/password authentication
client.username_pw_set("username", "password")
# Client certificate authentication
client.tls_set(certfile="client-cert.pem", keyfile="client-key.pem")Best Practices
Use Appropriate QoS Levels
- QoS 0: For non-critical data (sensor readings, logs)
- QoS 1: For important data (orders, notifications)
- QoS 2: For critical data (payments, transactions)
Design Topic Hierarchy
- Use consistent naming patterns
- Keep topics short and descriptive
- Plan for scalability
Handle Reconnections
- Implement automatic reconnection
- Resubscribe to topics after reconnection
- Use persistent sessions when needed
Monitor Message Size
- Keep messages under 256KB
- Use compression for large payloads
- Consider chunking for very large messages
Use Retained Messages Wisely
- Only retain essential configuration data
- Clear retained messages when no longer needed
- Avoid retaining frequently changing data
Client Libraries
FlowMQ works with any standard MQTT client library:
- Python:
paho-mqtt,asyncio-mqtt - JavaScript:
mqtt.js,paho-mqtt - Java:
Eclipse Paho,HiveMQ - C/C++:
Eclipse Paho C,Mosquitto - Go:
paho.mqtt.golang - Rust:
rumqttc,paho-mqtt-rust
Connection Parameters
| Parameter | Default | Description |
|---|---|---|
| Host | your-namespace.flowmq.io | FlowMQ service host |
| Port | 1883 | MQTT port (TLS: 8883) |
| Keep Alive | 60s | Connection keep-alive interval |
| Clean Session | true | Start with clean session |
| Client ID | Auto-generated | Unique client identifier |
Next Steps
- Learn about AMQP Protocol
- Explore Kafka Protocol
- Check SDK Documentation for language-specific guides