Skip to content

Connect FlowMQ with RabbitMQ C Client

FlowMQ natively supports the AMQP 0.9.1 protocol, allowing C and C++ applications to connect using any compliant client library. This guide walks you through using the RabbitMQ C client (rabbitmq-c) to connect to FlowMQ, declare exchanges and queues, publish messages, and consume them.

Prerequisites

Before you begin, ensure you have the following:

  • A running instance of FlowMQ.
  • The FlowMQ AMQP listener endpoint (e.g., your-flowmq-host:5672).
  • A C development environment (e.g., GCC or Clang).
  • The rabbitmq-c library installed on your system.

Installation

You can build and install rabbitmq-c from source:

bash
git clone https://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c
mkdir build && cd build
cmake ..
cmake --build .
sudo cmake --install .

Connecting to FlowMQ

Establish a TCP connection, authenticate, and open a channel. All AMQP operations are performed on the channel.

c
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define HOSTNAME "your-flowmq-host"
#define PORT     5672
#define USERNAME "guest"
#define PASSWORD "guest"
#define VHOST    "/"

static void die_on_error(amqp_rpc_reply_t reply, const char *context)
{
    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
    {
        fprintf(stderr, "%s failed\n", context);
        exit(1);
    }
}

int main(void)
{
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t *socket = amqp_tcp_socket_new(conn);

    if (amqp_socket_open(socket, HOSTNAME, PORT))
    {
        fprintf(stderr, "Failed to open TCP socket\n");
        return 1;
    }

    die_on_error(amqp_login(conn, VHOST, 0, 131072, 0,
                            AMQP_SASL_METHOD_PLAIN, USERNAME, PASSWORD),
                 "Login");
    amqp_channel_open(conn, 1);
    die_on_error(amqp_get_rpc_reply(conn), "Opening channel");

    /* ... declare, consume, publish logic here ... */

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    return 0;
}

Declaring an Exchange and Queue

Messages in AMQP are published to exchanges, which route them to bound queues.

c
amqp_bytes_t exchange = amqp_cstring_bytes("my_exchange");
amqp_bytes_t queue = amqp_cstring_bytes("my_queue_c");

amqp_exchange_declare(conn, 1, exchange,
                      amqp_cstring_bytes("direct"),
                      0, 0, 0, 0, amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Declaring exchange");

amqp_queue_declare(conn, 1, queue,
                   0, 0, 0, 0, amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Declaring queue");

Binding the Queue

Bind the queue to the exchange with a routing key so messages can be delivered.

c
amqp_queue_bind(conn, 1, queue, exchange,
                amqp_cstring_bytes("my_key"),
                amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Binding queue");

Registering a Consumer

Register a consumer on the queue before publishing so the message can be received.

c
amqp_basic_consume(conn, 1, queue, amqp_empty_bytes,
                   0, 1, 0, amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Registering consumer");

Publishing a Message

Publish a message to the exchange with the routing key used in the binding.

c
amqp_basic_properties_t props;
memset(&props, 0, sizeof(props));
amqp_bytes_t body = amqp_cstring_bytes("Hello FlowMQ from C!");

if (amqp_basic_publish(conn, 1, exchange,
                       amqp_cstring_bytes("my_key"),
                       0, 0, &props, body) < 0)
{
    fprintf(stderr, "Publishing message failed\n");
    return 1;
}
printf(" [x] Sent Hello FlowMQ from C!\n");

Receiving Messages

Read the next message delivered to the consumer.

c
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
die_on_error(amqp_consume_message(conn, &envelope, NULL, 0),
             "Consuming message");

printf(" [x] Received %.*s\n",
       (int)envelope.message.body.len,
       (char *)envelope.message.body.bytes);
amqp_destroy_envelope(&envelope);

Full Example

Here is a complete program that connects, declares resources, publishes a message, and consumes it.

c
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define HOSTNAME "your-flowmq-host"
#define PORT     5672
#define USERNAME "guest"
#define PASSWORD "guest"
#define VHOST    "/"

static void die_on_error(amqp_rpc_reply_t reply, const char *context)
{
    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
    {
        fprintf(stderr, "%s failed\n", context);
        exit(1);
    }
}

int main(void)
{
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t *socket = amqp_tcp_socket_new(conn);

    if (amqp_socket_open(socket, HOSTNAME, PORT))
    {
        fprintf(stderr, "Failed to open TCP socket\n");
        return 1;
    }

    die_on_error(amqp_login(conn, VHOST, 0, 131072, 0,
                            AMQP_SASL_METHOD_PLAIN, USERNAME, PASSWORD),
                 "Login");
    amqp_channel_open(conn, 1);
    die_on_error(amqp_get_rpc_reply(conn), "Opening channel");

    amqp_bytes_t exchange = amqp_cstring_bytes("my_exchange");
    amqp_bytes_t queue = amqp_cstring_bytes("my_queue_c");

    amqp_exchange_declare(conn, 1, exchange,
                          amqp_cstring_bytes("direct"),
                          0, 0, 0, 0, amqp_empty_table);
    die_on_error(amqp_get_rpc_reply(conn), "Declaring exchange");
    amqp_queue_declare(conn, 1, queue,
                       0, 0, 0, 0, amqp_empty_table);
    die_on_error(amqp_get_rpc_reply(conn), "Declaring queue");
    amqp_queue_bind(conn, 1, queue, exchange,
                    amqp_cstring_bytes("my_key"),
                    amqp_empty_table);
    die_on_error(amqp_get_rpc_reply(conn), "Binding queue");

    amqp_basic_consume(conn, 1, queue, amqp_empty_bytes,
                       0, 1, 0, amqp_empty_table);
    die_on_error(amqp_get_rpc_reply(conn), "Registering consumer");

    amqp_basic_properties_t props;
    memset(&props, 0, sizeof(props));
    amqp_bytes_t body = amqp_cstring_bytes("Hello FlowMQ from C!");
    if (amqp_basic_publish(conn, 1, exchange,
                           amqp_cstring_bytes("my_key"),
                           0, 0, &props, body) < 0)
    {
        fprintf(stderr, "Publishing message failed\n");
        return 1;
    }
    printf(" [x] Sent Hello FlowMQ from C!\n");

    amqp_envelope_t envelope;
    amqp_maybe_release_buffers(conn);
    die_on_error(amqp_consume_message(conn, &envelope, NULL, 0),
                 "Consuming message");
    printf(" [x] Received %.*s\n",
           (int)envelope.message.body.len,
           (char *)envelope.message.body.bytes);
    amqp_destroy_envelope(&envelope);

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    return 0;
}

Compile and run:

bash
gcc -o amqp_example amqp_example.c -lrabbitmq
./amqp_example

Additional Resources

  • For more advanced features and examples, refer to the official rabbitmq-c repository.
  • Explore FlowMQ's advanced features for AMQP integration.