Skip to content

Kafka Protocol

FlowMQ provides native Apache Kafka protocol support, allowing you to use standard Kafka clients and tools to connect, produce, and consume messages. Kafka in FlowMQ maps to streams for high-throughput, durable message processing with replay capabilities.

Overview

Kafka in FlowMQ maps to the unified routing model where:

  • Kafka Topics map to Streams
  • Kafka Partitions provide parallel processing
  • Kafka Consumer Groups enable load balancing

Connection

Connect to FlowMQ using standard Kafka clients:

python
from kafka import KafkaProducer, KafkaConsumer

# Producer configuration
producer = KafkaProducer(
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

# Consumer configuration
consumer = KafkaConsumer(
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    group_id='my-consumer-group',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

Producing Messages

Basic Producing

python
import json

# Create producer
producer = KafkaProducer(
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send message
future = producer.send('orders', {'order_id': '12345', 'amount': 99.99})
record_metadata = future.get(timeout=10)
print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")

# Send with key for partitioning
producer.send('user-events', key='user-123', value={'action': 'login'})

# Flush to ensure delivery
producer.flush()

Producing with Headers

python
# Send with headers
headers = [
    ('source', b'order-service'),
    ('version', b'1.0'),
    ('priority', b'high')
]

producer.send(
    'orders',
    value={'order_id': '12345', 'amount': 99.99},
    headers=headers
)

Consuming Messages

Basic Consuming

python
from kafka import KafkaConsumer
import json

# Create consumer
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    group_id='order-processors',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Consume messages
for message in consumer:
    print(f"Received: {message.value}")
    print(f"Topic: {message.topic}")
    print(f"Partition: {message.partition}")
    print(f"Offset: {message.offset}")
    # Process the message

Consumer Groups

python
# Multiple consumers in the same group
consumer1 = KafkaConsumer(
    'orders',
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    group_id='order-processors',  # Same group for load balancing
    auto_offset_reset='earliest'
)

consumer2 = KafkaConsumer(
    'orders',
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    group_id='order-processors',  # Same group for load balancing
    auto_offset_reset='earliest'
)

# Each consumer will receive a subset of messages

Manual Offset Management

python
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    group_id='order-processors',
    enable_auto_commit=False,  # Disable auto-commit
    auto_offset_reset='earliest'
)

for message in consumer:
    try:
        # Process message
        process_order(message.value)
        
        # Manually commit offset
        consumer.commit()
        
    except Exception as e:
        print(f"Error processing message: {e}")
        # Don't commit on error - message will be reprocessed

Topic Management

Creating Topics

python
from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(
    bootstrap_servers=['your-namespace.flowmq.io:9092']
)

# Create topic
topic_list = [
    NewTopic(
        name='orders',
        num_partitions=3,
        replication_factor=1
    )
]

admin_client.create_topics(topic_list)

Partitioning

Key-Based Partitioning

python
# Messages with the same key go to the same partition
producer.send('user-events', key='user-123', value={'action': 'login'})
producer.send('user-events', key='user-123', value={'action': 'logout'})
# Both messages go to the same partition

producer.send('user-events', key='user-456', value={'action': 'purchase'})
# This message goes to a different partition

Interoperability

Kafka → MQTT

Messages produced via Kafka can be consumed by MQTT clients:

python
# Kafka Producer
producer.send('sensors.temperature', {'value': 22.5, 'unit': 'celsius'})
python
# MQTT Consumer
import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
    print(f"Received: {msg.payload.decode()}")

client = mqtt.Client()
client.on_message = on_message
client.connect("your-namespace.flowmq.io", 1883)
client.subscribe("sensors/temperature")
client.loop_forever()

Kafka → AMQP

Kafka messages can be consumed by AMQP clients:

python
# Kafka Producer
producer.send('orders.eu.new', {'order_id': '12345', 'amount': 99.99})
python
# AMQP Consumer
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('your-namespace.flowmq.io', 5672)
)
channel = connection.channel()

channel.queue_declare(queue='order-processing')
channel.queue_bind(exchange='orders', queue='order-processing', routing_key='eu.new')

def callback(ch, method, properties, body):
    print(f"Received: {body}")

channel.basic_consume(queue='order-processing', on_message_callback=callback, auto_ack=True)
channel.start_consuming()

Offset Management

Committing Offsets

python
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    group_id='order-processors',
    enable_auto_commit=False
)

for message in consumer:
    try:
        # Process message
        process_order(message.value)
        
        # Commit specific offset
        consumer.commit({message.topic_partition: message.offset + 1})
        
    except Exception as e:
        print(f"Error: {e}")
        # Don't commit - will reprocess

Offset Reset Strategies

python
# Start from earliest available message
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    group_id='order-processors',
    auto_offset_reset='earliest'
)

# Start from latest message only
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    group_id='order-processors',
    auto_offset_reset='latest'
)

Error Handling

python
from kafka.errors import KafkaError

# Producer error handling
def on_send_success(record_metadata):
    print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")

def on_send_error(excp):
    print(f"Failed to send message: {excp}")

producer = KafkaProducer(
    bootstrap_servers=['your-namespace.flowmq.io:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send with callbacks
future = producer.send('orders', {'order_id': '12345'})
future.add_callback(on_send_success).add_errback(on_send_error)

# Consumer error handling
try:
    for message in consumer:
        process_message(message)
except KafkaError as e:
    print(f"Kafka error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")
finally:
    consumer.close()

Security

SSL/TLS Connection

python
producer = KafkaProducer(
    bootstrap_servers=['your-namespace.flowmq.io:9093'],
    security_protocol='SSL',
    ssl_cafile='ca-cert.pem',
    ssl_certfile='client-cert.pem',
    ssl_keyfile='client-key.pem',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

Best Practices

  1. Use Appropriate Partitions

    • More partitions = more parallelism
    • Balance between parallelism and overhead
  2. Handle Consumer Groups Properly

    • Use meaningful group IDs
    • Monitor consumer lag
    • Implement proper error handling
  3. Manage Offsets Carefully

    • Use auto-commit for simple cases
    • Manual commit for critical applications
    • Handle offset reset scenarios
  4. Use Keys for Ordering

    • Same key = same partition = ordered delivery
    • Choose keys that distribute evenly

Client Libraries

FlowMQ works with any standard Kafka client library:

  • Python: kafka-python, confluent-kafka-python
  • Java: kafka-clients, spring-kafka
  • Node.js: kafkajs, node-rdkafka
  • Go: sarama, confluent-kafka-go
  • C#: Confluent.Kafka
  • Rust: rdkafka

Connection Parameters

ParameterDefaultDescription
Bootstrap Serversyour-namespace.flowmq.io:9092Kafka broker addresses
Security ProtocolPLAINTEXTConnection security (PLAINTEXT/SSL/SASL)
Group IDRequired for consumersConsumer group identifier
Auto Offset ResetlatestOffset reset strategy
Enable Auto CommittrueAutomatic offset committing

Next Steps