Connect FlowMQ with MQTT Python Client
FlowMQ's native support for the MQTT protocol makes it easy for Python developers to build powerful, real-time applications. This guide will walk you through using the popular Eclipse Paho Python Client to connect to FlowMQ, publish messages, and subscribe to topics.
Prerequisites
Before you start, ensure you have the following:
- A running instance of FlowMQ.
- Python 3.6 or later installed.
pip
for installing packages.
Installation
You can install the Paho MQTT client using pip
.
pip install paho-mqtt
Connecting to FlowMQ
To get started, you need to create a client instance and define callback functions for connection and message events.
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to FlowMQ Broker!")
else:
print(f"Failed to connect, return code {rc}\n")
client = mqtt.Client(client_id="my-python-client")
client.on_connect = on_connect
client.connect("localhost", 1883, 60)
# Start the network loop to process network traffic and dispatch callbacks
client.loop_start()
Subscribing to Topics
To receive messages, subscribe to a topic. The on_message
callback will be triggered when a message arrives.
def on_message(client, userdata, msg):
print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")
client.on_message = on_message
client.subscribe("flowmq/test/python")
Publishing Messages
Once connected, you can easily publish messages to any topic.
client.publish("flowmq/test/python", "Hello FlowMQ from Python!")
Full Example
Here is a complete Python script that connects, subscribes, publishes a message, and handles the incoming message.
import paho.mqtt.client as mqtt
import time
# --- Callback Functions ---
def on_connect(client, userdata, flags, rc):
""" The callback for when the client receives a CONNACK response from the server. """
if rc == 0:
print("Connected successfully!")
# Subscribe to a topic upon connection
client.subscribe("flowmq/python/test")
else:
print(f"Connection failed with code {rc}")
def on_message(client, userdata, msg):
""" The callback for when a PUBLISH message is received from the server. """
print(f"Received message on topic '{msg.topic}': {msg.payload.decode()}")
# You might want to stop the loop after receiving a message in a simple script
client.loop_stop()
# --- Main Script ---
# Create a client instance
client = mqtt.Client(client_id="python-example-client")
# Assign callbacks
client.on_connect = on_connect
client.on_message = on_message
# Connect to the broker
client.connect("localhost", 1883, 60)
# Start a non-blocking network loop
client.loop_start()
# Publish a message after a short delay to ensure the subscription is active
time.sleep(1)
print("Publishing message...")
client.publish("flowmq/python/test", "A test message from Paho Python.")
# Keep the script running to listen for messages
# The on_message callback will stop the loop.
# For a continuous listener, you might use client.loop_forever()
time.sleep(2)
client.disconnect()
print("Disconnected.")
Additional Resources
- For more advanced features, such as different QoS levels and retained messages, refer to the official Eclipse Paho Python Client documentation.
- Explore FlowMQ's advanced features for MQTT.