Connect FlowMQ with Kafka Ruby Client
FlowMQ's native support for the Apache Kafka protocol allows Ruby developers to easily connect and interact with it using popular libraries like ruby-kafka
. This guide will walk you through the process of producing and consuming messages from FlowMQ using this client.
Prerequisites
Before you start, ensure you have the following:
- A running instance of FlowMQ.
- Ruby 2.5 or later installed.
- Bundler for managing gems.
Installation
Add ruby-kafka
to your project's Gemfile
and then install it using Bundler.
# Gemfile
source "https://rubygems.org"
gem "ruby-kafka"
Then, run bundle install
from your terminal:
bundle install
Producer: Connecting and Sending Messages
A producer is responsible for sending messages to a topic in FlowMQ.
Configuration and Connection
First, you need to initialize a new Kafka client, specifying the seed_brokers
for your FlowMQ instance. Then, you can create a producer from the client.
require "kafka"
# Configure the client
kafka = Kafka.new(
seed_brokers: ["localhost:9092"],
client_id: "my-ruby-app"
)
# Instantiate a new producer
producer = kafka.producer
Sending a Message
Use the produce
method to send a message to a topic. The deliver_messages
method ensures that all buffered messages are sent.
# Produce a message
producer.produce("Hello FlowMQ from Ruby!", topic: "my-topic")
# Deliver the message
producer.deliver_messages
puts "Message sent successfully!"
Consumer: Connecting and Receiving Messages
A consumer subscribes to topics and processes the messages sent to them.
Configuration and Connection
Create a consumer by specifying the group_id
. This ID uniquely identifies the consumer group this instance belongs to.
require "kafka"
# Configure the client
kafka = Kafka.new(
seed_brokers: ["localhost:9092"],
client_id: "my-ruby-app"
)
# Instantiate a new consumer
consumer = kafka.consumer(group_id: "my-ruby-group")
Subscribing and Consuming
Subscribe the consumer to a topic. The each_message
method will block and process messages as they are received.
# Subscribe to a topic
consumer.subscribe("my-topic")
# Create a trap to gracefully shut down the consumer
trap("TERM") { consumer.stop }
# Start consuming messages
consumer.each_message do |message|
puts "Received message:"
puts " Topic: #{message.topic}"
puts " Partition: #{message.partition}"
puts " Offset: #{message.offset}"
puts " Value: #{message.value}"
end
Additional Resources
- For more advanced features and detailed documentation, visit the official ruby-kafka repository.
- Explore FlowMQ's advanced features for Kafka integration.