Skip to content

Connect FlowMQ with Kafka Erlang SDK

FlowMQ's native support for the Apache Kafka protocol means you can use a variety of client libraries to connect and interact with it. This guide will walk you through using brod, a popular Erlang client for Apache Kafka, to produce and consume messages from FlowMQ.

Prerequisites

Before you start, make sure you have the following:

  • A running instance of FlowMQ.
  • Erlang/OTP 22 or later installed.

Dependencies

To use brod, add it to your rebar.config file as a dependency.

erlang
{deps, [
    {brod, ".*", {git, "https://github.com/kafka4beam/brod.git", {tag, "0.13.2"}}}
]}.

Then, fetch the dependencies:

bash
rebar3 get-deps

Producer: Connecting and Sending Messages

A producer sends messages to a topic in FlowMQ.

Starting the Client

First, you need to start a brod client process that connects to your FlowMQ brokers.

erlang
-module(my_producer).
-export([start/0, send_message/1]).

start() ->
    Endpoints = [{"localhost", 9092}],
    ok = brod:start_client(Endpoints, my_client_id),
    ok.

send_message(Message) ->
    Topic = <<"my_topic">>,
    % Using 'rand' lets the client choose the partition
    brod:produce_sync(my_client_id, Topic, rand, <<>>, Message).

To run this, you would start an Erlang shell, compile the module, start the client, and then send a message.

erlang
1> rebar3 shell
2> c(my_producer).
{ok,my_producer}
3> my_producer:start().
ok
4> my_producer:send_message(<<"Hello FlowMQ from Erlang!">>).
{ok,0,1} % {ok, Partition, Offset}

Consumer: Connecting and Receiving Messages

A consumer subscribes to topics and processes messages. With brod, you typically implement a consumer group using a brod_group_subscriber.

Consumer Callback Module

First, create a callback module that will handle the incoming messages.

erlang
-module(my_consumer_callback).
-export([init/2, handle_message/4]).

-record(state, {}).

init(_GroupId, _Topics) ->
    {ok, #state{}}.

handle_message(Topic, Partition, Offset, Message) ->
    io:format("Received message from ~p/~p at offset ~p: ~p~n",
              [Topic, Partition, Offset, Message]),
    ok. % Return ok to commit the offset

Starting the Consumer Group

Now, you can start the consumer group, pointing it to your callback module.

erlang
-module(my_consumer).
-export([start/0]).

start() ->
    Endpoints = [{"localhost", 9092}],
    GroupId = <<"my_erlang_group">>,
    Topics = [<<"my_topic">>],
    
    % Start the client for the consumer
    ok = brod:start_client(Endpoints, my_consumer_client),
    
    % Start the group subscriber
    {ok, _Sup, _} = brod:start_link_group_subscriber(
        self(), % Supervisor
        GroupId,
        Topics,
        brod_gss_config(my_consumer_client), % Get client config
        #{callback_module => my_consumer_callback}
    ),
    ok.

brod_gss_config(ClientId) ->
    #{
        client_id_fun => fun() -> ClientId end,
        offset_commit_interval_seconds => 5
    }.

To run the consumer, start a shell, compile the modules, and start the consumer supervisor.

erlang
1> rebar3 shell
2> c(my_consumer_callback).
{ok, my_consumer_callback}
3> c(my_consumer).
{ok, my_consumer}
4> my_consumer:start().
ok

The consumer will now listen for messages and print them to the console.

Additional Resources

  • For more detailed usage and advanced features, check out the official brod repository.
  • Explore FlowMQ's advanced features for Kafka integration.