Kafka Protocol
FlowMQ provides native Apache Kafka protocol support, allowing you to use standard Kafka clients and tools to connect, produce, and consume messages. Kafka in FlowMQ maps to streams for high-throughput, durable message processing with replay capabilities.
Overview
Kafka in FlowMQ maps to the unified routing model where:
- Kafka Topics map to Streams
- Kafka Partitions provide parallel processing
- Kafka Consumer Groups enable load balancing
Connection
Connect to FlowMQ using standard Kafka clients:
python
from kafka import KafkaProducer, KafkaConsumer
# Producer configuration
producer = KafkaProducer(
bootstrap_servers=['your-namespace.flowmq.io:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
# Consumer configuration
consumer = KafkaConsumer(
bootstrap_servers=['your-namespace.flowmq.io:9092'],
group_id='my-consumer-group',
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)Producing Messages
Basic Producing
python
import json
# Create producer
producer = KafkaProducer(
bootstrap_servers=['your-namespace.flowmq.io:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Send message
future = producer.send('orders', {'order_id': '12345', 'amount': 99.99})
record_metadata = future.get(timeout=10)
print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")
# Send with key for partitioning
producer.send('user-events', key='user-123', value={'action': 'login'})
# Flush to ensure delivery
producer.flush()Producing with Headers
python
# Send with headers
headers = [
('source', b'order-service'),
('version', b'1.0'),
('priority', b'high')
]
producer.send(
'orders',
value={'order_id': '12345', 'amount': 99.99},
headers=headers
)Consuming Messages
Basic Consuming
python
from kafka import KafkaConsumer
import json
# Create consumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['your-namespace.flowmq.io:9092'],
group_id='order-processors',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# Consume messages
for message in consumer:
print(f"Received: {message.value}")
print(f"Topic: {message.topic}")
print(f"Partition: {message.partition}")
print(f"Offset: {message.offset}")
# Process the messageConsumer Groups
python
# Multiple consumers in the same group
consumer1 = KafkaConsumer(
'orders',
bootstrap_servers=['your-namespace.flowmq.io:9092'],
group_id='order-processors', # Same group for load balancing
auto_offset_reset='earliest'
)
consumer2 = KafkaConsumer(
'orders',
bootstrap_servers=['your-namespace.flowmq.io:9092'],
group_id='order-processors', # Same group for load balancing
auto_offset_reset='earliest'
)
# Each consumer will receive a subset of messagesManual Offset Management
python
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['your-namespace.flowmq.io:9092'],
group_id='order-processors',
enable_auto_commit=False, # Disable auto-commit
auto_offset_reset='earliest'
)
for message in consumer:
try:
# Process message
process_order(message.value)
# Manually commit offset
consumer.commit()
except Exception as e:
print(f"Error processing message: {e}")
# Don't commit on error - message will be reprocessedTopic Management
Creating Topics
python
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers=['your-namespace.flowmq.io:9092']
)
# Create topic
topic_list = [
NewTopic(
name='orders',
num_partitions=3,
replication_factor=1
)
]
admin_client.create_topics(topic_list)Partitioning
Key-Based Partitioning
python
# Messages with the same key go to the same partition
producer.send('user-events', key='user-123', value={'action': 'login'})
producer.send('user-events', key='user-123', value={'action': 'logout'})
# Both messages go to the same partition
producer.send('user-events', key='user-456', value={'action': 'purchase'})
# This message goes to a different partitionInteroperability
Kafka → MQTT
Messages produced via Kafka can be consumed by MQTT clients:
python
# Kafka Producer
producer.send('sensors.temperature', {'value': 22.5, 'unit': 'celsius'})python
# MQTT Consumer
import paho.mqtt.client as mqtt
def on_message(client, userdata, msg):
print(f"Received: {msg.payload.decode()}")
client = mqtt.Client()
client.on_message = on_message
client.connect("your-namespace.flowmq.io", 1883)
client.subscribe("sensors/temperature")
client.loop_forever()Kafka → AMQP
Kafka messages can be consumed by AMQP clients:
python
# Kafka Producer
producer.send('orders.eu.new', {'order_id': '12345', 'amount': 99.99})python
# AMQP Consumer
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('your-namespace.flowmq.io', 5672)
)
channel = connection.channel()
channel.queue_declare(queue='order-processing')
channel.queue_bind(exchange='orders', queue='order-processing', routing_key='eu.new')
def callback(ch, method, properties, body):
print(f"Received: {body}")
channel.basic_consume(queue='order-processing', on_message_callback=callback, auto_ack=True)
channel.start_consuming()Offset Management
Committing Offsets
python
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['your-namespace.flowmq.io:9092'],
group_id='order-processors',
enable_auto_commit=False
)
for message in consumer:
try:
# Process message
process_order(message.value)
# Commit specific offset
consumer.commit({message.topic_partition: message.offset + 1})
except Exception as e:
print(f"Error: {e}")
# Don't commit - will reprocessOffset Reset Strategies
python
# Start from earliest available message
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['your-namespace.flowmq.io:9092'],
group_id='order-processors',
auto_offset_reset='earliest'
)
# Start from latest message only
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['your-namespace.flowmq.io:9092'],
group_id='order-processors',
auto_offset_reset='latest'
)Error Handling
python
from kafka.errors import KafkaError
# Producer error handling
def on_send_success(record_metadata):
print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")
def on_send_error(excp):
print(f"Failed to send message: {excp}")
producer = KafkaProducer(
bootstrap_servers=['your-namespace.flowmq.io:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Send with callbacks
future = producer.send('orders', {'order_id': '12345'})
future.add_callback(on_send_success).add_errback(on_send_error)
# Consumer error handling
try:
for message in consumer:
process_message(message)
except KafkaError as e:
print(f"Kafka error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
consumer.close()Security
SSL/TLS Connection
python
producer = KafkaProducer(
bootstrap_servers=['your-namespace.flowmq.io:9093'],
security_protocol='SSL',
ssl_cafile='ca-cert.pem',
ssl_certfile='client-cert.pem',
ssl_keyfile='client-key.pem',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)Best Practices
Use Appropriate Partitions
- More partitions = more parallelism
- Balance between parallelism and overhead
Handle Consumer Groups Properly
- Use meaningful group IDs
- Monitor consumer lag
- Implement proper error handling
Manage Offsets Carefully
- Use auto-commit for simple cases
- Manual commit for critical applications
- Handle offset reset scenarios
Use Keys for Ordering
- Same key = same partition = ordered delivery
- Choose keys that distribute evenly
Client Libraries
FlowMQ works with any standard Kafka client library:
- Python:
kafka-python,confluent-kafka-python - Java:
kafka-clients,spring-kafka - Node.js:
kafkajs,node-rdkafka - Go:
sarama,confluent-kafka-go - C#:
Confluent.Kafka - Rust:
rdkafka
Connection Parameters
| Parameter | Default | Description |
|---|---|---|
| Bootstrap Servers | your-namespace.flowmq.io:9092 | Kafka broker addresses |
| Security Protocol | PLAINTEXT | Connection security (PLAINTEXT/SSL/SASL) |
| Group ID | Required for consumers | Consumer group identifier |
| Auto Offset Reset | latest | Offset reset strategy |
| Enable Auto Commit | true | Automatic offset committing |
Next Steps
- Learn about MQTT Protocol
- Explore AMQP Protocol
- Check SDK Documentation for language-specific guides