Connect FlowMQ with Kafka Python SDK
FlowMQ's support for the Apache Kafka protocol allows you to leverage a rich ecosystem of client libraries. This guide provides a step-by-step walkthrough on using Confluent's Python Client for Apache Kafka to connect, produce, and consume messages from FlowMQ. This client is a reliable, high-performance library that wraps the underlying librdkafka
.
Prerequisites
Before you start, ensure you have the following:
- A running instance of FlowMQ.
- Python 3.6 or later installed.
pip
, the Python package installer.
Installation
You can install the Confluent Kafka client using pip
.
pip install confluent-kafka
For more details, you can refer to the official installation guide [1].
Producer: Connecting and Sending Messages
A producer is used to send messages to a Kafka topic in FlowMQ.
Configuration and Connection
First, create a Producer
instance. The only mandatory configuration is bootstrap.servers
, which should point to your FlowMQ broker.
from confluent_kafka import Producer
# Create Producer instance
p = Producer({'bootstrap.servers': 'localhost:9092'})
Producing a Message
To send a message, call the produce()
method with a topic and message value. It's recommended to implement a delivery report callback to confirm that messages have been delivered successfully.
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result. """
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
# Asynchronously produce a message.
topic = 'my_topic'
p.produce(topic, 'Hello, FlowMQ with Python!', callback=delivery_report)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
Consumer: Connecting and Receiving Messages
A consumer subscribes to topics and processes the messages sent to them.
Configuration and Connection
Create a Consumer
instance with the bootstrap.servers
and a group.id
. The group.id
is a unique string that identifies the consumer group this consumer belongs to.
from confluent_kafka import Consumer, KafkaError
# Create Consumer instance
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_consumer_group',
'auto.offset.reset': 'earliest'
})
Subscribing and Consuming
After creating a consumer, subscribe it to a topic and then enter a loop to poll for new messages.
# Subscribe to topic
topic = 'my_topic'
c.subscribe([topic])
try:
while True:
msg = c.poll(1.0) # Timeout of 1 second
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
else:
print(f"Consumer error: {msg.error()}")
continue
print(f'Received message: {msg.value().decode("utf-8")}')
except KeyboardInterrupt:
print("Aborted by user")
finally:
# Close down consumer to commit final offsets.
c.close()
Additional Resources
- For more in-depth information and advanced usage, refer to the official Confluent Kafka Python Client repository [1].
- Explore FlowMQ's advanced features for Kafka integration.
Reference: [1] Confluent Inc. (2025). confluent-kafka-python. GitHub. Retrieved from https://github.com/confluentinc/confluent-kafka-python