Skip to content

Connect FlowMQ with Kafka Node.js Client

With FlowMQ's native support for the Apache Kafka protocol, Node.js developers can easily connect to it using modern clients like kafkajs. This guide provides a complete walkthrough for producing and consuming messages from FlowMQ using kafkajs.

Prerequisites

Before you start, make sure you have the following:

  • A running instance of FlowMQ.
  • Node.js v14 or later installed.
  • npm or yarn for package management.

Installation

Add the kafkajs package to your Node.js project.

bash
# Using npm
npm install kafkajs

# Or using yarn
yarn add kafkajs

Producer: Connecting and Sending Messages

A producer is responsible for sending messages to a topic in FlowMQ.

Configuration and Connection

First, initialize the KafkaJS client, specifying the clientId and the brokers for your FlowMQ instance. Then, create a producer from the client.

javascript
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const producer = kafka.producer();

const runProducer = async () => {
  // Connecting the producer
  await producer.connect();

  // ...
};

runProducer().catch(console.error);

Sending a Message

Use the send method to publish a message to a topic. You can send a single message or an array of messages.

javascript
  await producer.send({
    topic: 'my-topic',
    messages: [
      { value: 'Hello FlowMQ with KafkaJS!' },
    ],
  });

  console.log("Message sent successfully");
  await producer.disconnect();

Consumer: Connecting and Receiving Messages

A consumer subscribes to topics and processes the messages sent to them.

Configuration and Connection

Create a consumer, making sure to specify a groupId. This ID uniquely identifies the consumer group this instance belongs to.

javascript
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'my-node-group' });

const runConsumer = async () => {
  // Connecting the consumer
  await consumer.connect();
  
  // ...
};

runConsumer().catch(console.error);

Subscribing and Consuming

Subscribe the consumer to a topic and use the run method to start consuming messages. The eachMessage handler will be called for every message received.

javascript
  await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      });
    },
  });

Additional Resources

  • For more advanced features and detailed documentation, visit the official KafkaJS website.
  • Explore FlowMQ's advanced features for Kafka integration.