Skip to content

Connect FlowMQ with Kafka Ruby Client

FlowMQ's native support for the Apache Kafka protocol allows Ruby developers to easily connect and interact with it using popular libraries like ruby-kafka. This guide will walk you through the process of producing and consuming messages from FlowMQ using this client.

Prerequisites

Before you start, ensure you have the following:

  • A running instance of FlowMQ.
  • Ruby 2.5 or later installed.
  • Bundler for managing gems.

Installation

Add ruby-kafka to your project's Gemfile and then install it using Bundler.

ruby
# Gemfile
source "https://rubygems.org"

gem "ruby-kafka"

Then, run bundle install from your terminal:

bash
bundle install

Producer: Connecting and Sending Messages

A producer is responsible for sending messages to a topic in FlowMQ.

Configuration and Connection

First, you need to initialize a new Kafka client, specifying the seed_brokers for your FlowMQ instance. Then, you can create a producer from the client.

ruby
require "kafka"

# Configure the client
kafka = Kafka.new(
  seed_brokers: ["localhost:9092"],
  client_id: "my-ruby-app"
)

# Instantiate a new producer
producer = kafka.producer

Sending a Message

Use the produce method to send a message to a topic. The deliver_messages method ensures that all buffered messages are sent.

ruby
# Produce a message
producer.produce("Hello FlowMQ from Ruby!", topic: "my-topic")

# Deliver the message
producer.deliver_messages

puts "Message sent successfully!"

Consumer: Connecting and Receiving Messages

A consumer subscribes to topics and processes the messages sent to them.

Configuration and Connection

Create a consumer by specifying the group_id. This ID uniquely identifies the consumer group this instance belongs to.

ruby
require "kafka"

# Configure the client
kafka = Kafka.new(
  seed_brokers: ["localhost:9092"],
  client_id: "my-ruby-app"
)

# Instantiate a new consumer
consumer = kafka.consumer(group_id: "my-ruby-group")

Subscribing and Consuming

Subscribe the consumer to a topic. The each_message method will block and process messages as they are received.

ruby
# Subscribe to a topic
consumer.subscribe("my-topic")

# Create a trap to gracefully shut down the consumer
trap("TERM") { consumer.stop }

# Start consuming messages
consumer.each_message do |message|
  puts "Received message:"
  puts "  Topic: #{message.topic}"
  puts "  Partition: #{message.partition}"
  puts "  Offset: #{message.offset}"
  puts "  Value: #{message.value}"
end

Additional Resources

  • For more advanced features and detailed documentation, visit the official ruby-kafka repository.
  • Explore FlowMQ's advanced features for Kafka integration.