Connect FlowMQ with Kafka Node.js Client
With FlowMQ's native support for the Apache Kafka protocol, Node.js developers can easily connect to it using modern clients like kafkajs
. This guide provides a complete walkthrough for producing and consuming messages from FlowMQ using kafkajs
.
Prerequisites
Before you start, make sure you have the following:
- A running instance of FlowMQ.
- Node.js v14 or later installed.
- npm or yarn for package management.
Installation
Add the kafkajs
package to your Node.js project.
# Using npm
npm install kafkajs
# Or using yarn
yarn add kafkajs
Producer: Connecting and Sending Messages
A producer is responsible for sending messages to a topic in FlowMQ.
Configuration and Connection
First, initialize the KafkaJS client, specifying the clientId
and the brokers
for your FlowMQ instance. Then, create a producer from the client.
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
const runProducer = async () => {
// Connecting the producer
await producer.connect();
// ...
};
runProducer().catch(console.error);
Sending a Message
Use the send
method to publish a message to a topic. You can send a single message or an array of messages.
await producer.send({
topic: 'my-topic',
messages: [
{ value: 'Hello FlowMQ with KafkaJS!' },
],
});
console.log("Message sent successfully");
await producer.disconnect();
Consumer: Connecting and Receiving Messages
A consumer subscribes to topics and processes the messages sent to them.
Configuration and Connection
Create a consumer, making sure to specify a groupId
. This ID uniquely identifies the consumer group this instance belongs to.
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({ groupId: 'my-node-group' });
const runConsumer = async () => {
// Connecting the consumer
await consumer.connect();
// ...
};
runConsumer().catch(console.error);
Subscribing and Consuming
Subscribe the consumer to a topic and use the run
method to start consuming messages. The eachMessage
handler will be called for every message received.
await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
Additional Resources
- For more advanced features and detailed documentation, visit the official KafkaJS website.
- Explore FlowMQ's advanced features for Kafka integration.