Skip to content

Connect FlowMQ with MQTT Erlang Client

FlowMQ's native support for MQTT v5.0 and v3.1.1 makes it easy for Erlang developers to build real-time, distributed systems. This guide provides a complete walkthrough on using the emqtt client to connect, publish messages, and subscribe to topics on FlowMQ.

Prerequisites

Before you begin, ensure you have the following:

  • A running instance of FlowMQ.
  • Erlang/OTP 22 or later installed.
  • rebar3 for managing dependencies.

Dependencies

To get started, add emqtt as a dependency in your rebar.config file.

erlang
{deps, [
    {emqtt, ".*", {git, "https://github.com/emqx/emqtt.git", {tag, "1.6.0"}}}
]}.

Then, run rebar3 get-deps to fetch the new dependency.

Connecting to FlowMQ

To connect, you start an emqtt client process. This process will handle the connection to the FlowMQ broker.

erlang
-module(my_mqtt_client).
-export([start/0]).

start() ->
    % Start the client and connect
    {ok, Pid} = emqtt:start_link([{host, "localhost"}, {clientid, <<"my_erlang_client">>}]),
    
    io:format("Client started with Pid: ~p~n", [Pid]),
    
    % The process that starts the client will receive messages
    % from subscriptions by default.
    emqtt:subscribe(Pid, <<"flowmq/test">>, qos1),
    
    io:format("Subscribed to 'flowmq/test'~n"),
    
    emqtt:publish(Pid, <<"flowmq/test">>, <<"Hello FlowMQ from Erlang!">>, qos1),
    
    io:format("Published message.~n"),
    
    loop().

loop() ->
    receive
        {publish, Topic, Payload} ->
            io:format("Received message on ~p: ~p~n", [Topic, Payload]),
            loop(); % Continue listening for more messages
        Any ->
            io:format("Received unknown message: ~p~n", [Any]),
            loop()
    after 5000 -> % Timeout after 5 seconds of inactivity
        io:format("No message received, exiting.~n")
    end.

Running the Example

To run this example, save the code as my_mqtt_client.erl, start an Erlang shell with rebar3, compile, and run the start/0 function.

erlang
$ rebar3 shell
===> Verifying dependencies...
===> Compiling my_app
1> c(my_mqtt_client).
{ok,my_mqtt_client}
2> my_mqtt_client:start().
Client started with Pid: <0.183.0>
Subscribed to 'flowmq/test'
Published message.
Received message on <<"flowmq/test">>: <<"Hello FlowMQ from Erlang!">>
No message received, exiting.
ok

How It Works

  1. emqtt:start_link: Establishes a connection to the FlowMQ broker at localhost:1883 and links a new client process.
  2. emqtt:subscribe: Subscribes the client to the flowmq/test topic. By default, messages from this topic will be sent to the process that called start_link.
  3. emqtt:publish: Publishes a message to the same topic.
  4. loop/0: Enters a receive loop to wait for the incoming message from the subscription, prints it, and then waits for more messages.

Additional Resources

  • For more advanced features, such as connection options and error handling, refer to the official emqtt repository.
  • Explore FlowMQ's advanced features for MQTT.