Connect FlowMQ with RabbitMQ C Client
FlowMQ natively supports the AMQP 0.9.1 protocol, allowing C and C++ applications to connect using any compliant client library. This guide walks you through using the RabbitMQ C client (rabbitmq-c) to connect to FlowMQ, declare exchanges and queues, publish messages, and consume them.
Prerequisites
Before you begin, ensure you have the following:
- A running instance of FlowMQ.
- The FlowMQ AMQP listener endpoint (e.g., your-flowmq-host:5672).
- A C development environment (e.g., GCC or Clang).
- The
rabbitmq-clibrary installed on your system.
Installation
You can build and install rabbitmq-c from source:
git clone https://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c
mkdir build && cd build
cmake ..
cmake --build .
sudo cmake --install .Connecting to FlowMQ
Establish a TCP connection, authenticate, and open a channel. All AMQP operations are performed on the channel.
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define HOSTNAME "your-flowmq-host"
#define PORT 5672
#define USERNAME "guest"
#define PASSWORD "guest"
#define VHOST "/"
static void die_on_error(amqp_rpc_reply_t reply, const char *context)
{
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
{
fprintf(stderr, "%s failed\n", context);
exit(1);
}
}
int main(void)
{
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
if (amqp_socket_open(socket, HOSTNAME, PORT))
{
fprintf(stderr, "Failed to open TCP socket\n");
return 1;
}
die_on_error(amqp_login(conn, VHOST, 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN, USERNAME, PASSWORD),
"Login");
amqp_channel_open(conn, 1);
die_on_error(amqp_get_rpc_reply(conn), "Opening channel");
/* ... declare, consume, publish logic here ... */
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}Declaring an Exchange and Queue
Messages in AMQP are published to exchanges, which route them to bound queues.
amqp_bytes_t exchange = amqp_cstring_bytes("my_exchange");
amqp_bytes_t queue = amqp_cstring_bytes("my_queue_c");
amqp_exchange_declare(conn, 1, exchange,
amqp_cstring_bytes("direct"),
0, 0, 0, 0, amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Declaring exchange");
amqp_queue_declare(conn, 1, queue,
0, 0, 0, 0, amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Declaring queue");Binding the Queue
Bind the queue to the exchange with a routing key so messages can be delivered.
amqp_queue_bind(conn, 1, queue, exchange,
amqp_cstring_bytes("my_key"),
amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Binding queue");Registering a Consumer
Register a consumer on the queue before publishing so the message can be received.
amqp_basic_consume(conn, 1, queue, amqp_empty_bytes,
0, 1, 0, amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Registering consumer");Publishing a Message
Publish a message to the exchange with the routing key used in the binding.
amqp_basic_properties_t props;
memset(&props, 0, sizeof(props));
amqp_bytes_t body = amqp_cstring_bytes("Hello FlowMQ from C!");
if (amqp_basic_publish(conn, 1, exchange,
amqp_cstring_bytes("my_key"),
0, 0, &props, body) < 0)
{
fprintf(stderr, "Publishing message failed\n");
return 1;
}
printf(" [x] Sent Hello FlowMQ from C!\n");Receiving Messages
Read the next message delivered to the consumer.
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
die_on_error(amqp_consume_message(conn, &envelope, NULL, 0),
"Consuming message");
printf(" [x] Received %.*s\n",
(int)envelope.message.body.len,
(char *)envelope.message.body.bytes);
amqp_destroy_envelope(&envelope);Full Example
Here is a complete program that connects, declares resources, publishes a message, and consumes it.
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define HOSTNAME "your-flowmq-host"
#define PORT 5672
#define USERNAME "guest"
#define PASSWORD "guest"
#define VHOST "/"
static void die_on_error(amqp_rpc_reply_t reply, const char *context)
{
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
{
fprintf(stderr, "%s failed\n", context);
exit(1);
}
}
int main(void)
{
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
if (amqp_socket_open(socket, HOSTNAME, PORT))
{
fprintf(stderr, "Failed to open TCP socket\n");
return 1;
}
die_on_error(amqp_login(conn, VHOST, 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN, USERNAME, PASSWORD),
"Login");
amqp_channel_open(conn, 1);
die_on_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_bytes_t exchange = amqp_cstring_bytes("my_exchange");
amqp_bytes_t queue = amqp_cstring_bytes("my_queue_c");
amqp_exchange_declare(conn, 1, exchange,
amqp_cstring_bytes("direct"),
0, 0, 0, 0, amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Declaring exchange");
amqp_queue_declare(conn, 1, queue,
0, 0, 0, 0, amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Declaring queue");
amqp_queue_bind(conn, 1, queue, exchange,
amqp_cstring_bytes("my_key"),
amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queue, amqp_empty_bytes,
0, 1, 0, amqp_empty_table);
die_on_error(amqp_get_rpc_reply(conn), "Registering consumer");
amqp_basic_properties_t props;
memset(&props, 0, sizeof(props));
amqp_bytes_t body = amqp_cstring_bytes("Hello FlowMQ from C!");
if (amqp_basic_publish(conn, 1, exchange,
amqp_cstring_bytes("my_key"),
0, 0, &props, body) < 0)
{
fprintf(stderr, "Publishing message failed\n");
return 1;
}
printf(" [x] Sent Hello FlowMQ from C!\n");
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
die_on_error(amqp_consume_message(conn, &envelope, NULL, 0),
"Consuming message");
printf(" [x] Received %.*s\n",
(int)envelope.message.body.len,
(char *)envelope.message.body.bytes);
amqp_destroy_envelope(&envelope);
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}Compile and run:
gcc -o amqp_example amqp_example.c -lrabbitmq
./amqp_exampleAdditional Resources
- For more advanced features and examples, refer to the official rabbitmq-c repository.
- Explore FlowMQ's advanced features for AMQP integration.