Skip to content

Connect FlowMQ with Kafka Python SDK

FlowMQ's support for the Apache Kafka protocol allows you to leverage a rich ecosystem of client libraries. This guide provides a step-by-step walkthrough on using Confluent's Python Client for Apache Kafka to connect, produce, and consume messages from FlowMQ. This client is a reliable, high-performance library that wraps the underlying librdkafka.

Prerequisites

Before you start, ensure you have the following:

  • A running instance of FlowMQ.
  • Python 3.6 or later installed.
  • pip, the Python package installer.

Installation

You can install the Confluent Kafka client using pip.

bash
pip install confluent-kafka

For more details, you can refer to the official installation guide [1].

Producer: Connecting and Sending Messages

A producer is used to send messages to a Kafka topic in FlowMQ.

Configuration and Connection

First, create a Producer instance. The only mandatory configuration is bootstrap.servers, which should point to your FlowMQ broker.

python
from confluent_kafka import Producer

# Create Producer instance
p = Producer({'bootstrap.servers': 'localhost:9092'})

Producing a Message

To send a message, call the produce() method with a topic and message value. It's recommended to implement a delivery report callback to confirm that messages have been delivered successfully.

python
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result. """
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

# Asynchronously produce a message.
topic = 'my_topic'
p.produce(topic, 'Hello, FlowMQ with Python!', callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()

Consumer: Connecting and Receiving Messages

A consumer subscribes to topics and processes the messages sent to them.

Configuration and Connection

Create a Consumer instance with the bootstrap.servers and a group.id. The group.id is a unique string that identifies the consumer group this consumer belongs to.

python
from confluent_kafka import Consumer, KafkaError

# Create Consumer instance
c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_consumer_group',
    'auto.offset.reset': 'earliest'
})

Subscribing and Consuming

After creating a consumer, subscribe it to a topic and then enter a loop to poll for new messages.

python
# Subscribe to topic
topic = 'my_topic'
c.subscribe([topic])

try:
    while True:
        msg = c.poll(1.0) # Timeout of 1 second

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print('%% %s [%d] reached end at offset %d\n' %
                      (msg.topic(), msg.partition(), msg.offset()))
            else:
                print(f"Consumer error: {msg.error()}")
            continue

        print(f'Received message: {msg.value().decode("utf-8")}')

except KeyboardInterrupt:
    print("Aborted by user")

finally:
    # Close down consumer to commit final offsets.
    c.close()

Additional Resources

  • For more in-depth information and advanced usage, refer to the official Confluent Kafka Python Client repository [1].
  • Explore FlowMQ's advanced features for Kafka integration.

Reference: [1] Confluent Inc. (2025). confluent-kafka-python. GitHub. Retrieved from https://github.com/confluentinc/confluent-kafka-python