Connect FlowMQ with AMQP Javascript Client
FlowMQ's native support for AMQP 0.9.1 makes it a great choice for Node.js applications that require reliable, asynchronous messaging. This guide will walk you through using amqplib, the most popular AMQP 0.9.1 client for Node.js, to connect to FlowMQ, declare queues, publish messages, and consume them.
Prerequisites
Before you start, ensure you have the following:
- A running instance of FlowMQ.
- Node.js v14 or later installed.
npmoryarnfor package management.
Installation
Add the amqplib package to your project using your preferred package manager.
# Using npm
npm install amqplib
# Or using yarn
yarn add amqplibConnecting to FlowMQ
First, you need to connect to your FlowMQ broker. Once connected, you can create a channel, which is where most of the API for getting things done resides.
const amqp = require('amqplib');
async function connect() {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
console.log('Connected to FlowMQ!');
return { connection, channel };
} catch (error) {
console.error('Failed to connect to FlowMQ', error);
process.exit(1);
}
}Declaring a Queue
For this example, we'll declare a queue and publish messages directly to it using the default exchange.
async function setupQueue(channel) {
const queueName = 'hello-js';
await channel.assertQueue(queueName, { durable: false });
return queueName;
}Publishing a Message
Now, you can publish a message to the queue. The default exchange will route it based on the queue name.
function publishMessage(channel, queueName, message) {
channel.sendToQueue(queueName, Buffer.from(message));
console.log(`[x] Sent '${message}'`);
}Consuming Messages
To consume messages, you can register a consumer on a queue. The callback will be executed for each message that arrives.
function consumeMessages(channel, queueName) {
console.log(`[*] Waiting for messages in ${queueName}. To exit press CTRL+C`);
channel.consume(queueName, (msg) => {
if (msg !== null) {
console.log(`[x] Received '${msg.content.toString()}'`);
channel.ack(msg);
}
});
}Full Example
Here are two separate scripts, one for publishing (publisher.js) and one for consuming (consumer.js).
Publisher (publisher.js)
const amqp = require('amqplib');
async function main() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'hello';
const msg = 'Hello World!';
await channel.assertQueue(queue, { durable: false });
channel.sendToQueue(queue, Buffer.from(msg));
console.log(" [x] Sent %s", msg);
setTimeout(() => {
connection.close();
process.exit(0);
}, 500);
}
main();Consumer (consumer.js)
const amqp = require('amqplib');
async function main() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'hello';
await channel.assertQueue(queue, { durable: false });
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, (msg) => {
console.log(" [x] Received %s", msg.content.toString());
}, { noAck: true });
}
main();Additional Resources
- For more advanced features and a detailed API reference, refer to the official amqplib documentation on GitHub.
- Explore FlowMQ's advanced features for AMQP integration.