LarryDpk
发布于 2024-08-11 / 26 阅读
1

Node Kafka Producer And Consumer Example

#JS

Node Kafka Producer And Consumer Example

To set up a Node.js project where producer.js sends messages to Kafka using a cron job, and consumer.js consumes those messages, follow these steps:

1. Initialize the Node.js Project

First, create a new directory for your project and initialize it with npm:

mkdir node-kafka-example
cd node-kafka-example
npm init -y

2. Install Required Dependencies

You will need kafkajs for interacting with Kafka and node-cron for scheduling the cron job:

npm install kafkajs node-cron

3. Create the Producer Script (producer.js)

This script will use node-cron to schedule a job that sends a message to Kafka at regular intervals:

// producer.js

const { Kafka } = require('kafkajs');
const cron = require('node-cron');

// Initialize Kafka
const kafka = new Kafka({
  clientId: 'my-producer',
  brokers: ['localhost:9092'] // Replace with your Kafka broker addresses
});

// Create a producer instance
const producer = kafka.producer();

// Function to send a message to Kafka
const sendMessage = async () => {
  try {
    await producer.connect();
    await producer.send({
      topic: 'my-topic', // Replace with your Kafka topic
      messages: [{ value: `Message sent at ${new Date().toISOString()}` }],
    });
    console.log('Message sent successfully');
    await producer.disconnect();
  } catch (err) {
    console.error('Error sending message', err);
  }
};

// Schedule a cron job to send a message every 3s
cron.schedule('*/3 * * * * *', () => {
  console.log('Cron job running...');
  sendMessage();
});

console.log('Producer running...');

4. Create the Consumer Script (consumer.js)

This script will consume messages from the Kafka topic:

// consumer.js

const { Kafka } = require('kafkajs');

// Initialize Kafka
const kafka = new Kafka({
  clientId: 'my-consumer',
  brokers: ['localhost:9092'] // Replace with your Kafka broker addresses
});

// Create a consumer instance
const consumer = kafka.consumer({ groupId: 'test-group' });

const run = async () => {
  // Connect the consumer
  await consumer.connect();

  // Subscribe to the topic
  await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

  // Consume messages
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      });
    },
  });
};

run().catch(console.error);

console.log('Consumer running...');

5. Running the Producer and Consumer

You can run these scripts independently in separate terminal sessions:

node producer.js
node consumer.js

6. Kafka Setup

Ensure that Kafka is running and accessible. If you’re running Kafka locally, you should have it set up with at least one broker, and the topic (my-topic) should be created.

7. Explanation

  • producer.js: This script uses node-cron to schedule a job that sends a message to the Kafka topic every minute. The sendMessage function is responsible for connecting to Kafka, sending the message, and then disconnecting.

  • consumer.js: This script connects to Kafka as a consumer, subscribes to the specified topic, and logs any messages it receives.

This setup allows you to independently run the producer and consumer, with the producer automatically sending messages at intervals defined by the cron job.

Code

Please check the code in GitHub.