Skip to content

MQTT Protocol

FlowMQ provides native MQTT v5.0 and v3.1.1 support, making it an ideal broker for IoT devices, mobile applications, and real-time messaging. MQTT's lightweight protocol and publish/subscribe model work seamlessly with FlowMQ's unified routing engine.

Overview

MQTT in FlowMQ maps directly to the unified routing model where:

  • MQTT Topics map to Topics
  • MQTT Subscriptions map to Subscriptions
  • MQTT QoS Levels provide delivery guarantees

Connection

Connect to FlowMQ using any standard MQTT client:

python
import paho.mqtt.client as mqtt

# Create client
client = mqtt.Client(client_id="my-device-001")

# Set credentials
client.username_pw_set("username", "password")

# Connect to FlowMQ
client.connect("your-namespace.flowmq.io", 1883, 60)
client.loop_start()

Publishing Messages

Basic Publishing

python
# Publish a simple message
client.publish("sensors/temperature", "22.5", qos=1)

# Publish JSON payload
import json
payload = {
    "temperature": 22.5,
    "humidity": 65,
    "timestamp": "2024-01-15T10:30:00Z"
}
client.publish("sensors/room1", json.dumps(payload), qos=1)

Publishing with Properties (MQTT v5.0)

python
# MQTT v5.0 properties
properties = {
    "content_type": "application/json",
    "user_property": ("device_id", "sensor-001"),
    "message_expiry_interval": 3600,  # 1 hour
    "response_topic": "sensors/room1/response"
}

client.publish(
    "sensors/room1/temperature",
    json.dumps({"value": 22.5}),
    qos=1,
    properties=properties
)

Batch Publishing

python
# Publish multiple messages efficiently
topics = [
    ("sensors/temperature", "22.5"),
    ("sensors/humidity", "65"),
    ("sensors/pressure", "1013.25")
]

for topic, payload in topics:
    client.publish(topic, payload, qos=1)

Subscribing to Topics

Basic Subscription

python
def on_message(client, userdata, msg):
    print(f"Received: {msg.topic} -> {msg.payload.decode()}")
    # Process the message

# Set callback
client.on_message = on_message

# Subscribe to specific topic
client.subscribe("sensors/temperature", qos=1)

# Subscribe to multiple topics
client.subscribe([
    ("sensors/temperature", 1),
    ("sensors/humidity", 1),
    ("alerts/#", 2)
])

Wildcard Subscriptions

python
# Single-level wildcard
client.subscribe("sensors/+/temperature", qos=1)
# Matches: sensors/room1/temperature, sensors/room2/temperature

# Multi-level wildcard
client.subscribe("sensors/#", qos=1)
# Matches: sensors/temperature, sensors/room1/temperature, sensors/room1/humidity

# Multiple wildcards
client.subscribe("devices/+/+/status", qos=1)
# Matches: devices/sensor/001/status, devices/actuator/002/status

Subscription with Properties (MQTT v5.0)

python
# MQTT v5.0 subscription properties
subscription_properties = {
    "user_property": ("client_type", "mobile-app"),
    "subscription_identifier": 1
}

client.subscribe(
    "sensors/#",
    qos=1,
    properties=subscription_properties
)

QoS Levels

FlowMQ supports all MQTT QoS levels:

QoS 0 - At Most Once

python
# Fire and forget - no delivery guarantee
client.publish("logs/debug", "Debug message", qos=0)

QoS 1 - At Least Once

python
# Guaranteed delivery with possible duplicates
client.publish("orders/new", order_data, qos=1)

QoS 2 - Exactly Once

python
# Guaranteed exactly-once delivery
client.publish("payments/process", payment_data, qos=2)

Topic Structure

FlowMQ uses hierarchical topic names with forward slashes:

sensors/room1/temperature
devices/iot/sensor001/status
orders/eu/new/books
alerts/critical/system/maintenance

Best Practices for Topic Design

  1. Use Hierarchical Structure: domain/location/device/measurement
  2. Be Consistent: Use the same pattern across your application
  3. Keep Topics Short: Avoid overly long topic names
  4. Use Descriptive Names: Make topics self-documenting

Interoperability

MQTT → AMQP

Messages published via MQTT can be consumed by AMQP clients:

python
# MQTT Publisher
client.publish("orders/eu/new", '{"order_id": "12345"}', qos=1)
python
# AMQP Consumer
channel.queue_bind(
    exchange='orders',
    queue='order-processing',
    routing_key='eu.new'
)

MQTT → Kafka

MQTT messages can be consumed by Kafka clients:

python
# MQTT Publisher
client.publish("sensors/temperature", '{"value": 22.5}', qos=1)
bash
# Kafka Consumer
kafka-console-consumer.sh --bootstrap-server your-namespace.flowmq.io:9092 --topic sensors.temperature

AMQP → MQTT

AMQP messages can be consumed by MQTT clients:

python
# AMQP Publisher
channel.basic_publish(
    exchange='sensors',
    routing_key='temperature.us.california',
    body='{"value": 22.5}'
)
python
# MQTT Subscriber
client.subscribe("sensors/temperature/us/california", qos=1)

Retained Messages

Publish messages that are retained for new subscribers:

python
# Publish retained message
client.publish(
    "system/status",
    '{"status": "online", "version": "1.2.3"}',
    qos=1,
    retain=True
)

# New subscribers will receive the retained message immediately
client.subscribe("system/status", qos=1)

Last Will and Testament (LWT)

Configure messages to be sent when the client disconnects unexpectedly:

python
# Set LWT
client.will_set(
    topic="devices/status",
    payload='{"device_id": "sensor-001", "status": "offline"}',
    qos=1,
    retain=True
)

# Connect with LWT
client.connect("your-namespace.flowmq.io", 1883, 60)

Message Properties (MQTT v5.0)

MQTT v5.0 supports rich message properties:

python
properties = {
    "content_type": "application/json",
    "content_format": "application/json",
    "response_topic": "commands/response",
    "correlation_data": b"req-123",
    "message_expiry_interval": 3600,
    "user_property": [
        ("device_id", "sensor-001"),
        ("priority", "high")
    ]
}

client.publish(
    "commands/restart",
    '{"action": "restart"}',
    qos=1,
    properties=properties
)

Error Handling

python
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected successfully")
        client.subscribe("sensors/#", qos=1)
    else:
        print(f"Connection failed with code {rc}")

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print(f"Unexpected disconnection: {rc}")
        # Implement reconnection logic

def on_publish(client, userdata, mid):
    print(f"Message {mid} published successfully")

def on_subscribe(client, userdata, mid, granted_qos):
    print(f"Subscribed with QoS: {granted_qos}")

# Set callbacks
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.on_subscribe = on_subscribe

Security

TLS/SSL Connection

python
import ssl

# Enable TLS
client.tls_set(
    ca_certs="ca-certificates.crt",
    certfile="client-cert.pem",
    keyfile="client-key.pem",
    cert_reqs=ssl.CERT_REQUIRED,
    tls_version=ssl.PROTOCOL_TLSv1_2
)

# Connect with TLS
client.connect("your-namespace.flowmq.io", 8883, 60)

Authentication

python
# Username/password authentication
client.username_pw_set("username", "password")

# Client certificate authentication
client.tls_set(certfile="client-cert.pem", keyfile="client-key.pem")

Best Practices

  1. Use Appropriate QoS Levels

    • QoS 0: For non-critical data (sensor readings, logs)
    • QoS 1: For important data (orders, notifications)
    • QoS 2: For critical data (payments, transactions)
  2. Design Topic Hierarchy

    • Use consistent naming patterns
    • Keep topics short and descriptive
    • Plan for scalability
  3. Handle Reconnections

    • Implement automatic reconnection
    • Resubscribe to topics after reconnection
    • Use persistent sessions when needed
  4. Monitor Message Size

    • Keep messages under 256KB
    • Use compression for large payloads
    • Consider chunking for very large messages
  5. Use Retained Messages Wisely

    • Only retain essential configuration data
    • Clear retained messages when no longer needed
    • Avoid retaining frequently changing data

Client Libraries

FlowMQ works with any standard MQTT client library:

  • Python: paho-mqtt, asyncio-mqtt
  • JavaScript: mqtt.js, paho-mqtt
  • Java: Eclipse Paho, HiveMQ
  • C/C++: Eclipse Paho C, Mosquitto
  • Go: paho.mqtt.golang
  • Rust: rumqttc, paho-mqtt-rust

Connection Parameters

ParameterDefaultDescription
Hostyour-namespace.flowmq.ioFlowMQ service host
Port1883MQTT port (TLS: 8883)
Keep Alive60sConnection keep-alive interval
Clean SessiontrueStart with clean session
Client IDAuto-generatedUnique client identifier

Next Steps