Connect FlowMQ with Kafka Erlang SDK
FlowMQ's native support for the Apache Kafka protocol means you can use a variety of client libraries to connect and interact with it. This guide will walk you through using brod
, a popular Erlang client for Apache Kafka, to produce and consume messages from FlowMQ.
Prerequisites
Before you start, make sure you have the following:
- A running instance of FlowMQ.
- Erlang/OTP 22 or later installed.
Dependencies
To use brod
, add it to your rebar.config
file as a dependency.
{deps, [
{brod, ".*", {git, "https://github.com/kafka4beam/brod.git", {tag, "0.13.2"}}}
]}.
Then, fetch the dependencies:
rebar3 get-deps
Producer: Connecting and Sending Messages
A producer sends messages to a topic in FlowMQ.
Starting the Client
First, you need to start a brod
client process that connects to your FlowMQ brokers.
-module(my_producer).
-export([start/0, send_message/1]).
start() ->
Endpoints = [{"localhost", 9092}],
ok = brod:start_client(Endpoints, my_client_id),
ok.
send_message(Message) ->
Topic = <<"my_topic">>,
% Using 'rand' lets the client choose the partition
brod:produce_sync(my_client_id, Topic, rand, <<>>, Message).
To run this, you would start an Erlang shell, compile the module, start the client, and then send a message.
1> rebar3 shell
2> c(my_producer).
{ok,my_producer}
3> my_producer:start().
ok
4> my_producer:send_message(<<"Hello FlowMQ from Erlang!">>).
{ok,0,1} % {ok, Partition, Offset}
Consumer: Connecting and Receiving Messages
A consumer subscribes to topics and processes messages. With brod
, you typically implement a consumer group using a brod_group_subscriber
.
Consumer Callback Module
First, create a callback module that will handle the incoming messages.
-module(my_consumer_callback).
-export([init/2, handle_message/4]).
-record(state, {}).
init(_GroupId, _Topics) ->
{ok, #state{}}.
handle_message(Topic, Partition, Offset, Message) ->
io:format("Received message from ~p/~p at offset ~p: ~p~n",
[Topic, Partition, Offset, Message]),
ok. % Return ok to commit the offset
Starting the Consumer Group
Now, you can start the consumer group, pointing it to your callback module.
-module(my_consumer).
-export([start/0]).
start() ->
Endpoints = [{"localhost", 9092}],
GroupId = <<"my_erlang_group">>,
Topics = [<<"my_topic">>],
% Start the client for the consumer
ok = brod:start_client(Endpoints, my_consumer_client),
% Start the group subscriber
{ok, _Sup, _} = brod:start_link_group_subscriber(
self(), % Supervisor
GroupId,
Topics,
brod_gss_config(my_consumer_client), % Get client config
#{callback_module => my_consumer_callback}
),
ok.
brod_gss_config(ClientId) ->
#{
client_id_fun => fun() -> ClientId end,
offset_commit_interval_seconds => 5
}.
To run the consumer, start a shell, compile the modules, and start the consumer supervisor.
1> rebar3 shell
2> c(my_consumer_callback).
{ok, my_consumer_callback}
3> c(my_consumer).
{ok, my_consumer}
4> my_consumer:start().
ok
The consumer will now listen for messages and print them to the console.
Additional Resources
- For more detailed usage and advanced features, check out the official brod repository.
- Explore FlowMQ's advanced features for Kafka integration.