Connect FlowMQ with the Python SDK
The Python bindings of Flow SDK are built on asyncio
to provide a modern, high-performance, and non-blocking interface for messaging.
Prerequisites
- Python 3.7+: The SDK uses modern async features.
- Running FlowMQ Instance: You need access to a FlowMQ broker. For this guide, we'll assume it's running at
localhost:7070
.
Installation
The Python Flow SDK is available on PyPI. You can install it using pip
:
pip install flowmq-sdk
Core Concepts
Before connecting, it's helpful to understand the main components of the SDK:
FlowClient
: The primary object for managing your connection to the FlowMQ cluster. It's asynchronous and designed to be created once and reused throughout your application's lifecycle.Authentication
: A helper object for configuring credentials. The SDK currently supports token-based authentication.Producer
: An object created from theFlowClient
that is responsible for publishing messages.Consumer
: An object responsible for subscribing to topics, queues, or streams to receive messages.Message
: A simple data class representing a message, containing apayload
(bytes) andheaders
(dict).Queue
&Stream
: Objects that define the source you want to consume messages from.
Establishing a Connection
Connecting to FlowMQ is an asynchronous operation. The recommended way to manage the client's lifecycle is with an async with
block, which handles connection and disconnection automatically.
Configuration
First, import the necessary classes and create a configuration dictionary.
import asyncio
from flowmq import FlowClient, Authentication
async def main():
config = {
"broker_address": "localhost:7070",
"authentication": Authentication.token("YOUR_API_TOKEN")
}
# The client will automatically connect on entering the 'async with'
# block and disconnect on exit.
async with FlowClient(config) as client:
print("Successfully connected to FlowMQ!")
# Your application logic goes here
# e.g., publish or consume messages
if __name__ == "__main__":
asyncio.run(main())
Replace YOUR_API_TOKEN
with your actual credentials.
Publishing Messages
To publish messages, you create a Producer
from the client.
Example: Publishing a Message to a Topic
import asyncio
from flowmq import FlowClient, Authentication, Message, FlowPublishException
async def publish_example():
config = {
"broker_address": "localhost:7070",
"authentication": Authentication.token("YOUR_API_TOKEN")
}
async with FlowClient(config) as client:
# 1. Create a producer
producer = client.new_producer()
# 2. Construct a message
message = Message(
payload=b'Hello from Python!',
headers={'content-type': 'text/plain'}
)
# 3. Publish to a topic
try:
receipt = await producer.publish("sensor.v1.data", message)
print(f"Message published successfully! ID: {receipt.message_id}")
except FlowPublishException as e:
print(f"Failed to publish message: {e}")
if __name__ == "__main__":
asyncio.run(publish_example())
Consuming Messages
You can consume messages from either a Queue (for point-to-point, work-sharing patterns) or a Stream (for ordered, replayable event logs).
Consuming from a Queue
When consuming from a queue, you must explicitly acknowledge (ack
) each message after you have successfully processed it. This tells FlowMQ that the message can be safely removed from the queue.
import asyncio
from flowmq import FlowClient, Authentication, Queue
async def consume_from_queue():
config = {
"broker_address": "localhost:7070",
"authentication": Authentication.token("YOUR_API_TOKEN")
}
# Define the queue to consume from
task_queue = Queue("image.processing.tasks")
# Define a message handler callback
async def handler(message):
print(f"Received from queue: {message.payload.decode()}")
# Process the message...
print("Processing complete. Acknowledging message.")
await message.ack()
async with FlowClient(config) as client:
print(f"Starting consumer on queue: {task_queue.name}...")
# The consume method will run until the task is cancelled
await client.consume(task_queue, handler)
if __name__ == "__main__":
try:
asyncio.run(consume_from_queue())
except KeyboardInterrupt:
print("Consumer stopped.")
Consuming from a Stream
When consuming from a stream, you track your progress by committing offsets. This allows your consumer to resume from where it left off after a restart.
import asyncio
from flowmq import FlowClient, Authentication, Stream, StartPosition
async def consume_from_stream():
config = {
"broker_address": "localhost:7070",
"authentication": Authentication.token("YOUR_API_TOKEN")
}
# Define the stream and where to start consuming
# Options: StartPosition.EARLIEST, StartPosition.LATEST
event_stream = Stream("user.activity.events", start_position=StartPosition.EARLIEST)
# Define a message handler
async def handler(message):
print(f"Received from stream (offset: {message.offset}): {message.payload.decode()}")
# Process the message...
print("Processing complete. Committing offset.")
await message.commit_offset()
async with FlowClient(config) as client:
print(f"Starting consumer on stream: {event_stream.name}...")
await client.consume(event_stream, handler)
if __name__ == "__main__":
try:
asyncio.run(consume_from_stream())
except KeyboardInterrupt:
print("Consumer stopped.")
Full End-to-End Example
This example combines publishing and consuming to demonstrate a complete workflow. We will publish a message to a topic and then consume it from a queue that is subscribed to that topic.
import asyncio
from flowmq import FlowClient, Authentication, Message, Queue, FlowPublishException
async def main():
"""
A full example that connects, produces a message to a topic,
and consumes it from a queue.
"""
config = {
"broker_address": "localhost:7070",
"authentication": Authentication.token("YOUR_API_TOKEN")
}
client = FlowClient(config)
try:
await client.connect()
# 1. Producer: Publish a message
producer = client.new_producer()
message = Message(payload=b"End-to-End Test Message!")
try:
receipt = await producer.publish("e2e.topic", message)
print(f"Message published successfully. ID: {receipt.message_id}")
except FlowPublishException as e:
print(f"Failed to publish message: {e}")
return
# 2. Consumer: Consume from a queue
# (Assumes 'e2e.topic' is routed to 'e2e.queue' in FlowMQ)
e2e_queue = Queue("e2e.queue")
message_received = asyncio.Event()
async def message_handler(msg):
print(f"Received message: {msg.payload.decode()}")
await msg.ack()
message_received.set()
# Start consuming in the background
consumer_task = asyncio.create_task(client.consume(e2e_queue, message_handler))
print("Consumer started. Waiting for message...")
# Wait for the message to be received or timeout
try:
await asyncio.wait_for(message_received.wait(), timeout=10.0)
print("Successfully received and acknowledged message.")
except asyncio.TimeoutError:
print("Timed out waiting for message.")
finally:
consumer_task.cancel() # Stop the consumer task
finally:
await client.close()
print("Client closed.")
if __name__ == "__main__":
asyncio.run(main())
Additional Resources
You now have a solid foundation for using the Flow SDK for Python. From here, you can explore:
- Error Handling: Wrap SDK calls in
try...except
blocks to handle specific exceptions likeFlowConnectionError
orFlowTimeoutError
. - Advanced Configuration: Look into the
FlowClient
configuration options for setting timeouts, reconnection policies, and TLS. - Flow SDK Overview: For a broader understanding of the SDK's architecture and concepts, refer to the Flow SDK Developer Guide.