Skip to content

Connect FlowMQ with AMQP Javascript Client

FlowMQ's native support for AMQP 0.9.1 makes it a great choice for Node.js applications that require reliable, asynchronous messaging. This guide will walk you through using amqplib, the most popular AMQP 0.9.1 client for Node.js, to connect to FlowMQ, declare queues, publish messages, and consume them.

Prerequisites

Before you start, ensure you have the following:

  • A running instance of FlowMQ.
  • Node.js v14 or later installed.
  • npm or yarn for package management.

Installation

Add the amqplib package to your project using your preferred package manager.

bash
# Using npm
npm install amqplib

# Or using yarn
yarn add amqplib

Connecting to FlowMQ

First, you need to connect to your FlowMQ broker. Once connected, you can create a channel, which is where most of the API for getting things done resides.

javascript
const amqp = require('amqplib');

async function connect() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
    console.log('Connected to FlowMQ!');
    return { connection, channel };
  } catch (error) {
    console.error('Failed to connect to FlowMQ', error);
    process.exit(1);
  }
}

Declaring a Queue

For this example, we'll declare a queue and publish messages directly to it using the default exchange.

javascript
async function setupQueue(channel) {
  const queueName = 'hello-js';
  await channel.assertQueue(queueName, { durable: false });
  return queueName;
}

Publishing a Message

Now, you can publish a message to the queue. The default exchange will route it based on the queue name.

javascript
function publishMessage(channel, queueName, message) {
  channel.sendToQueue(queueName, Buffer.from(message));
  console.log(`[x] Sent '${message}'`);
}

Consuming Messages

To consume messages, you can register a consumer on a queue. The callback will be executed for each message that arrives.

javascript
function consumeMessages(channel, queueName) {
  console.log(`[*] Waiting for messages in ${queueName}. To exit press CTRL+C`);
  channel.consume(queueName, (msg) => {
    if (msg !== null) {
      console.log(`[x] Received '${msg.content.toString()}'`);
      channel.ack(msg);
    }
  });
}

Full Example

Here are two separate scripts, one for publishing (publisher.js) and one for consuming (consumer.js).

Publisher (publisher.js)

javascript
const amqp = require('amqplib');

async function main() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'hello';
  const msg = 'Hello World!';

  await channel.assertQueue(queue, { durable: false });
  channel.sendToQueue(queue, Buffer.from(msg));
  console.log(" [x] Sent %s", msg);

  setTimeout(() => {
    connection.close();
    process.exit(0);
  }, 500);
}

main();

Consumer (consumer.js)

javascript
const amqp = require('amqplib');

async function main() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'hello';

  await channel.assertQueue(queue, { durable: false });
  console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
  
  channel.consume(queue, (msg) => {
    console.log(" [x] Received %s", msg.content.toString());
  }, { noAck: true });
}

main();

Additional Resources

  • For more advanced features and a detailed API reference, refer to the official amqplib documentation on GitHub.
  • Explore FlowMQ's advanced features for AMQP integration.