Connect FlowMQ with AMQP Javascript Client
FlowMQ's native support for AMQP 0.9.1 makes it a great choice for Node.js applications that require reliable, asynchronous messaging. This guide will walk you through using amqplib
, the most popular AMQP 0.9.1 client for Node.js, to connect to FlowMQ, declare queues, publish messages, and consume them.
Prerequisites
Before you start, ensure you have the following:
- A running instance of FlowMQ.
- Node.js v14 or later installed.
npm
oryarn
for package management.
Installation
Add the amqplib
package to your project using your preferred package manager.
# Using npm
npm install amqplib
# Or using yarn
yarn add amqplib
Connecting to FlowMQ
First, you need to connect to your FlowMQ broker. Once connected, you can create a channel, which is where most of the API for getting things done resides.
const amqp = require('amqplib');
async function connect() {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
console.log('Connected to FlowMQ!');
return { connection, channel };
} catch (error) {
console.error('Failed to connect to FlowMQ', error);
process.exit(1);
}
}
Declaring a Queue
For this example, we'll declare a queue and publish messages directly to it using the default exchange.
async function setupQueue(channel) {
const queueName = 'hello-js';
await channel.assertQueue(queueName, { durable: false });
return queueName;
}
Publishing a Message
Now, you can publish a message to the queue. The default exchange will route it based on the queue name.
function publishMessage(channel, queueName, message) {
channel.sendToQueue(queueName, Buffer.from(message));
console.log(`[x] Sent '${message}'`);
}
Consuming Messages
To consume messages, you can register a consumer on a queue. The callback will be executed for each message that arrives.
function consumeMessages(channel, queueName) {
console.log(`[*] Waiting for messages in ${queueName}. To exit press CTRL+C`);
channel.consume(queueName, (msg) => {
if (msg !== null) {
console.log(`[x] Received '${msg.content.toString()}'`);
channel.ack(msg);
}
});
}
Full Example
Here are two separate scripts, one for publishing (publisher.js
) and one for consuming (consumer.js
).
Publisher (publisher.js
)
const amqp = require('amqplib');
async function main() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'hello';
const msg = 'Hello World!';
await channel.assertQueue(queue, { durable: false });
channel.sendToQueue(queue, Buffer.from(msg));
console.log(" [x] Sent %s", msg);
setTimeout(() => {
connection.close();
process.exit(0);
}, 500);
}
main();
Consumer (consumer.js
)
const amqp = require('amqplib');
async function main() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'hello';
await channel.assertQueue(queue, { durable: false });
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, (msg) => {
console.log(" [x] Received %s", msg.content.toString());
}, { noAck: true });
}
main();
Additional Resources
- For more advanced features and a detailed API reference, refer to the official amqplib documentation on GitHub.
- Explore FlowMQ's advanced features for AMQP integration.